diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 798718bc0..fac662ba1 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -47,7 +47,7 @@ use crate::handlers::http::modal::ingest::SyncRole; use crate::handlers::http::query::{Query, QueryError, TIME_ELAPSED_HEADER}; use crate::metrics::prom_utils::Metrics; use crate::option::Mode; -use crate::parseable::PARSEABLE; +use crate::parseable::{DEFAULT_TENANT, PARSEABLE}; use crate::rbac::role::model::Role; use crate::rbac::user::User; use crate::stats::Stats; @@ -85,6 +85,8 @@ pub struct BillingMetricEvent { pub provider: Option, #[serde(skip_serializing_if = "Option::is_none")] pub model: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub tenant_id: Option, pub event_type: String, pub event_time: chrono::NaiveDateTime, } @@ -92,6 +94,7 @@ pub struct BillingMetricEvent { // Internal structure for collecting metrics from prometheus #[derive(Debug, Default)] struct BillingMetricsCollector { + pub tenant_id: Option, pub node_address: String, pub node_type: String, pub total_events_ingested_by_date: HashMap, @@ -116,8 +119,9 @@ struct BillingMetricsCollector { } impl BillingMetricsCollector { - pub fn new(node_address: String, node_type: String) -> Self { + pub fn new(node_address: String, node_type: String, tenant_id: Option) -> Self { Self { + tenant_id, node_address, node_type, event_time: Utc::now().naive_utc(), @@ -153,6 +157,7 @@ impl BillingMetricsCollector { method: None, provider: None, model: None, + tenant_id: self.tenant_id.clone(), event_type: "billing-metrics".to_string(), event_time: self.event_time, }); @@ -282,6 +287,7 @@ impl BillingMetricsCollector { method: Some(method.clone()), provider: None, model: None, + tenant_id: self.tenant_id.clone(), event_type: "billing-metrics".to_string(), event_time: self.event_time, }); @@ -320,6 +326,7 @@ impl BillingMetricsCollector { method: None, provider: Some(provider.clone()), model: Some(model.clone()), + tenant_id: self.tenant_id.clone(), event_type: "billing-metrics".to_string(), event_time: self.event_time, }); @@ -1382,16 +1389,31 @@ fn extract_billing_metrics_from_samples( node_address: String, node_type: String, ) -> Vec { - let mut collector = BillingMetricsCollector::new(node_address, node_type); + // Group samples by tenant_id so each tenant gets its own collector + let mut collectors: HashMap, BillingMetricsCollector> = HashMap::new(); for sample in samples { if let prometheus_parse::Value::Counter(val) = sample.value { - process_sample(&mut collector, &sample, val); + // Extract tenant_id from labels; treat "DEFAULT_TENANT" or absent as None (single-tenant compat) + let tenant_id = sample + .labels + .get("tenant_id") + .filter(|t| *t != DEFAULT_TENANT) + .map(|t| t.to_string()); + + let collector = collectors.entry(tenant_id.clone()).or_insert_with(|| { + BillingMetricsCollector::new(node_address.clone(), node_type.clone(), tenant_id) + }); + + process_sample(collector, &sample, val); } } - // Convert to flattened events, excluding empty collections - collector.into_events() + // Convert all collectors to flattened events + collectors + .into_values() + .flat_map(|c| c.into_events()) + .collect() } /// Process a single prometheus sample and update the collector diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index fc1c51a69..73b6615d3 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -70,6 +70,14 @@ pub struct StorageMetadata { pub default_role: Option, pub suspended_services: Option>, pub global_query_auth: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub customer_name: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub start_date: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub end_date: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub plan: Option, } impl Default for StorageMetadata { @@ -88,6 +96,10 @@ impl Default for StorageMetadata { default_role: None, suspended_services: None, global_query_auth: None, + customer_name: None, + start_date: None, + end_date: None, + plan: None, } } } diff --git a/src/tenants/mod.rs b/src/tenants/mod.rs index a6ab439e2..62926cb30 100644 --- a/src/tenants/mod.rs +++ b/src/tenants/mod.rs @@ -59,6 +59,29 @@ impl TenantMetadata { ); } + pub fn get_tenant_meta(&self, tenant_id: &str) -> Option { + self.tenants.get(tenant_id).map(|t| t.meta.clone()) + } + + pub fn update_tenant_meta( + &self, + tenant_id: &str, + customer_name: Option, + start_date: Option, + end_date: Option, + plan: Option, + ) -> bool { + if let Some(mut tenant) = self.tenants.get_mut(tenant_id) { + tenant.meta.customer_name = customer_name; + tenant.meta.start_date = start_date; + tenant.meta.end_date = end_date; + tenant.meta.plan = plan; + true + } else { + false + } + } + pub fn get_global_query_auth(&self, tenant_id: &str) -> Option { if let Some(tenant) = self.tenants.get(tenant_id) { tenant.meta.global_query_auth.clone()