diff --git a/Cargo.toml b/Cargo.toml index 7bbd792..9ec6d59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ publish = false [dependencies] async-trait = "0.1.89" -clap = { version = "4.6", features = ["derive"] } +clap = { version = "4.6", features = ["derive", "env"] } dirs = "6" exn = "0.3" humantime = "2" diff --git a/src/application/mod.rs b/src/application/mod.rs index fb59a98..49c7dab 100644 --- a/src/application/mod.rs +++ b/src/application/mod.rs @@ -1,3 +1,6 @@ pub mod ports; pub mod service; pub mod updater_service; + +#[cfg(test)] +mod test_support; diff --git a/src/application/service.rs b/src/application/service.rs index f6d176d..ae14acb 100644 --- a/src/application/service.rs +++ b/src/application/service.rs @@ -19,6 +19,7 @@ use crate::domain::models::{ const DEFAULT_SEARCH_LIMIT: u64 = 50; const MAX_STREAM_SEARCH_LIMIT: u64 = 100; +const MAX_ALL_PAGES_MESSAGES: usize = 10_000; const DEFAULT_SEARCH_OFFSET: u64 = 0; const DEFAULT_SEARCH_SORT: &str = "timestamp"; @@ -89,16 +90,7 @@ impl ApplicationService { let mut input = input; if input.all_fields && input.fields.is_empty() { - let config = self - .config_store - .load() - .await - .or_raise(|| CliError::Config("failed to load runtime config".to_string()))? - .ok_or_else(|| { - CliError::Config( - "graylog is not configured, run `graylog-cli auth` first".to_string(), - ) - })?; + let config = self.require_config().await?; let ttl = config.graylog.fields_cache_ttl_seconds; let cache_key = "fields".to_string(); let now = SystemTime::now() @@ -381,16 +373,7 @@ impl ApplicationService { command: &'static str, request: MessageSearchRequest, ) -> exn::Result { - let config = self - .config_store - .load() - .await - .or_raise(|| CliError::Config("failed to load runtime config".to_string()))? - .ok_or_else(|| { - CliError::Config( - "graylog is not configured, run `graylog-cli auth` first".to_string(), - ) - })?; + let config = self.require_config().await?; let client = self.graylog_gateway_with_config(config.graylog)?; let result = client.search_messages(request.clone()).await.or_raise(|| { CliError::Http(HttpError::Unavailable { @@ -419,16 +402,7 @@ impl ApplicationService { &self, input: &SearchCommandInput, ) -> exn::Result { - let config = self - .config_store - .load() - .await - .or_raise(|| CliError::Config("failed to load runtime config".to_string()))? - .ok_or_else(|| { - CliError::Config( - "graylog is not configured, run `graylog-cli auth` first".to_string(), - ) - })?; + let config = self.require_config().await?; let client = self.graylog_gateway_with_config(config.graylog)?; let mut request = self.build_search_request(input.clone(), DEFAULT_SEARCH_LIMIT); let mut all_messages = Vec::new(); @@ -456,6 +430,10 @@ impl ApplicationService { let fetched = result.messages.len(); all_messages.extend(result.messages); + if all_messages.len() >= MAX_ALL_PAGES_MESSAGES { + break; + } + request.offset += fetched as u64; if fetched == 0 { @@ -535,16 +513,7 @@ impl ApplicationService { command: &'static str, request: AggregateSearchRequest, ) -> exn::Result { - let config = self - .config_store - .load() - .await - .or_raise(|| CliError::Config("failed to load runtime config".to_string()))? - .ok_or_else(|| { - CliError::Config( - "graylog is not configured, run `graylog-cli auth` first".to_string(), - ) - })?; + let config = self.require_config().await?; let client = self.graylog_gateway_with_config(config.graylog)?; let aggregation_type = request.aggregation_type.as_cli_value(); let result = client.search_aggregate(request).await.or_raise(|| { @@ -562,9 +531,8 @@ impl ApplicationService { }) } - async fn graylog_gateway(&self) -> exn::Result, CliError> { - let config = self - .config_store + async fn require_config(&self) -> exn::Result { + self.config_store .load() .await .or_raise(|| CliError::Config("failed to load runtime config".to_string()))? @@ -572,8 +540,12 @@ impl ApplicationService { CliError::Config( "graylog is not configured, run `graylog-cli auth` first".to_string(), ) - })?; + }) + .map_err(Into::into) + } + async fn graylog_gateway(&self) -> exn::Result, CliError> { + let config = self.require_config().await?; self.graylog_gateway_with_config(config.graylog) } @@ -639,14 +611,13 @@ fn parse_timestamp_to_millis(ts: &str) -> Option { mod tests { use super::*; - use std::collections::HashMap; use std::sync::Mutex; use async_trait::async_trait; use serde_json::{Map, Value}; - use crate::application::ports::cache_store::CacheError; use crate::application::ports::config_store::ConfigError; + use crate::application::test_support::fakes::FakeCacheStore; use crate::domain::config::{ DEFAULT_FIELDS_CACHE_TTL_SECONDS, DEFAULT_TIMEOUT_SECONDS, UpdaterConfig, }; @@ -701,48 +672,6 @@ mod tests { } } - #[derive(Clone, Default)] - struct FakeCacheStore { - storage: Arc>>, - } - - impl FakeCacheStore { - fn get(&self, key: &str) -> Option { - self.storage - .lock() - .expect("cache mutex should not be poisoned") - .get(key) - .cloned() - } - - fn insert(&self, key: &str, value: String) { - self.storage - .lock() - .expect("cache mutex should not be poisoned") - .insert(key.to_string(), value); - } - } - - #[async_trait] - impl CacheStore for FakeCacheStore { - async fn get_serialized(&self, key: &str) -> exn::Result, CacheError> { - Ok(self - .storage - .lock() - .expect("cache mutex should not be poisoned") - .get(key) - .cloned()) - } - - async fn save_serialized(&self, key: String, data: String) -> exn::Result<(), CacheError> { - self.storage - .lock() - .expect("cache mutex should not be poisoned") - .insert(key, data); - Ok(()) - } - } - #[derive(Clone)] struct FakeGraylogGateway { search_results: Arc>>, diff --git a/src/application/test_support.rs b/src/application/test_support.rs new file mode 100644 index 0000000..0a5b09d --- /dev/null +++ b/src/application/test_support.rs @@ -0,0 +1,51 @@ +#[cfg(test)] +pub(crate) mod fakes { + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + + use async_trait::async_trait; + + use crate::application::ports::cache_store::{CacheError, CacheStore}; + + #[derive(Clone, Default)] + pub(crate) struct FakeCacheStore { + storage: Arc>>, + } + + impl FakeCacheStore { + pub(crate) fn get(&self, key: &str) -> Option { + self.storage + .lock() + .expect("cache mutex should not be poisoned") + .get(key) + .cloned() + } + + pub(crate) fn insert(&self, key: &str, value: String) { + self.storage + .lock() + .expect("cache mutex should not be poisoned") + .insert(key.to_string(), value); + } + } + + #[async_trait] + impl CacheStore for FakeCacheStore { + async fn get_serialized(&self, key: &str) -> exn::Result, CacheError> { + Ok(self + .storage + .lock() + .expect("cache mutex should not be poisoned") + .get(key) + .cloned()) + } + + async fn save_serialized(&self, key: String, data: String) -> exn::Result<(), CacheError> { + self.storage + .lock() + .expect("cache mutex should not be poisoned") + .insert(key, data); + Ok(()) + } + } +} diff --git a/src/application/updater_service.rs b/src/application/updater_service.rs index 26627c1..513af23 100644 --- a/src/application/updater_service.rs +++ b/src/application/updater_service.rs @@ -338,40 +338,14 @@ fn unix_now() -> u64 { mod tests { use super::*; - use std::collections::HashMap; use std::path::Path; use std::sync::Mutex; use async_trait::async_trait; use tempfile::TempDir; - use crate::application::ports::cache_store::CacheError; use crate::application::ports::updater::ReleaseInfo; - - #[derive(Default)] - struct FakeCacheStore { - entries: Mutex>, - } - - #[async_trait] - impl CacheStore for FakeCacheStore { - async fn get_serialized(&self, key: &str) -> exn::Result, CacheError> { - Ok(self - .entries - .lock() - .expect("cache mutex should not be poisoned") - .get(key) - .cloned()) - } - - async fn save_serialized(&self, key: String, data: String) -> exn::Result<(), CacheError> { - self.entries - .lock() - .expect("cache mutex should not be poisoned") - .insert(key, data); - Ok(()) - } - } + use crate::application::test_support::fakes::FakeCacheStore; struct FakeUpdater { release: Mutex, diff --git a/src/infrastructure/config_store.rs b/src/infrastructure/config_store.rs index a28e92a..f10b97b 100644 --- a/src/infrastructure/config_store.rs +++ b/src/infrastructure/config_store.rs @@ -103,7 +103,16 @@ impl CacheStore for FileConfigStore { std::fs::create_dir_all(parent) .map_err(|error| CacheError::OperationFailure(error.to_string()))?; std::fs::write(&cache_path, data) - .map_err(|error| CacheError::OperationFailure(error.to_string())) + .map_err(|error| CacheError::OperationFailure(error.to_string()))?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let _ = + std::fs::set_permissions(&cache_path, std::fs::Permissions::from_mode(0o600)); + } + + Ok::<(), CacheError>(()) }) .await .map_err(|error| CacheError::StoreUnavailable(format!("failed to write cache: {error}")))? diff --git a/src/presentation/cli.rs b/src/presentation/cli.rs index 8ae0900..49f9ac8 100644 --- a/src/presentation/cli.rs +++ b/src/presentation/cli.rs @@ -82,12 +82,13 @@ pub struct AuthArgs { #[arg(short = 'u', long = "url", required = true)] pub url: String, /// Graylog access token. - #[arg(short = 't', long = "token", required = true)] + #[arg(short = 't', long = "token", required = true, env = "GRAYLOG_TOKEN")] pub token: String, } #[derive(Debug, Args)] pub struct SearchArgs { + #[arg(help = "Lucene search query")] pub query: String, #[command(flatten)] pub timerange: TimerangeArgs, @@ -131,10 +132,11 @@ impl SearchArgs { #[derive(Debug, Args)] pub struct AggregateArgs { + #[arg(help = "Lucene search query")] pub query: String, #[arg(long = "aggregation-type", value_enum)] pub aggregation_type: AggregationTypeArg, - #[arg(long = "field")] + #[arg(long = "field", help = "Field to aggregate on")] pub field: String, #[arg(long = "size", value_parser = clap::value_parser!(u64).range(1..=100))] pub size: Option, @@ -230,17 +232,21 @@ impl StreamsCommands { #[derive(Debug, Args)] pub struct StreamIdArgs { + #[arg(help = "Graylog stream ID")] pub stream_id: String, } #[derive(Debug, Args)] pub struct StreamNameArgs { + #[arg(help = "Stream name to search for")] pub name: String, } #[derive(Debug, Args)] pub struct StreamSearchArgs { + #[arg(help = "Graylog stream ID")] pub stream_id: String, + #[arg(help = "Lucene search query")] pub query: String, #[command(flatten)] pub timerange: TimerangeArgs, @@ -270,6 +276,7 @@ impl StreamSearchArgs { #[derive(Debug, Args)] pub struct StreamIdTimerangeArgs { + #[arg(help = "Graylog stream ID")] pub stream_id: String, #[command(flatten)] pub timerange: TimerangeArgs,