@@ -47,7 +47,7 @@ use crate::handlers::http::modal::ingest::SyncRole;
4747use crate :: handlers:: http:: query:: { Query , QueryError , TIME_ELAPSED_HEADER } ;
4848use crate :: metrics:: prom_utils:: Metrics ;
4949use crate :: option:: Mode ;
50- use crate :: parseable:: PARSEABLE ;
50+ use crate :: parseable:: { DEFAULT_TENANT , PARSEABLE } ;
5151use crate :: rbac:: role:: model:: Role ;
5252use crate :: rbac:: user:: User ;
5353use crate :: stats:: Stats ;
@@ -85,13 +85,16 @@ pub struct BillingMetricEvent {
8585 pub provider : Option < String > ,
8686 #[ serde( skip_serializing_if = "Option::is_none" ) ]
8787 pub model : Option < String > ,
88+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
89+ pub tenant_id : Option < String > ,
8890 pub event_type : String ,
8991 pub event_time : chrono:: NaiveDateTime ,
9092}
9193
9294// Internal structure for collecting metrics from prometheus
9395#[ derive( Debug , Default ) ]
9496struct BillingMetricsCollector {
97+ pub tenant_id : Option < String > ,
9598 pub node_address : String ,
9699 pub node_type : String ,
97100 pub total_events_ingested_by_date : HashMap < String , u64 > ,
@@ -116,8 +119,9 @@ struct BillingMetricsCollector {
116119}
117120
118121impl BillingMetricsCollector {
119- pub fn new ( node_address : String , node_type : String ) -> Self {
122+ pub fn new ( node_address : String , node_type : String , tenant_id : Option < String > ) -> Self {
120123 Self {
124+ tenant_id,
121125 node_address,
122126 node_type,
123127 event_time : Utc :: now ( ) . naive_utc ( ) ,
@@ -153,6 +157,7 @@ impl BillingMetricsCollector {
153157 method : None ,
154158 provider : None ,
155159 model : None ,
160+ tenant_id : self . tenant_id . clone ( ) ,
156161 event_type : "billing-metrics" . to_string ( ) ,
157162 event_time : self . event_time ,
158163 } ) ;
@@ -282,6 +287,7 @@ impl BillingMetricsCollector {
282287 method : Some ( method. clone ( ) ) ,
283288 provider : None ,
284289 model : None ,
290+ tenant_id : self . tenant_id . clone ( ) ,
285291 event_type : "billing-metrics" . to_string ( ) ,
286292 event_time : self . event_time ,
287293 } ) ;
@@ -320,6 +326,7 @@ impl BillingMetricsCollector {
320326 method : None ,
321327 provider : Some ( provider. clone ( ) ) ,
322328 model : Some ( model. clone ( ) ) ,
329+ tenant_id : self . tenant_id . clone ( ) ,
323330 event_type : "billing-metrics" . to_string ( ) ,
324331 event_time : self . event_time ,
325332 } ) ;
@@ -1382,16 +1389,31 @@ fn extract_billing_metrics_from_samples(
13821389 node_address : String ,
13831390 node_type : String ,
13841391) -> Vec < BillingMetricEvent > {
1385- let mut collector = BillingMetricsCollector :: new ( node_address, node_type) ;
1392+ // Group samples by tenant_id so each tenant gets its own collector
1393+ let mut collectors: HashMap < Option < String > , BillingMetricsCollector > = HashMap :: new ( ) ;
13861394
13871395 for sample in samples {
13881396 if let prometheus_parse:: Value :: Counter ( val) = sample. value {
1389- process_sample ( & mut collector, & sample, val) ;
1397+ // Extract tenant_id from labels; treat "DEFAULT_TENANT" or absent as None (single-tenant compat)
1398+ let tenant_id = sample
1399+ . labels
1400+ . get ( "tenant_id" )
1401+ . filter ( |t| * t != DEFAULT_TENANT )
1402+ . map ( |t| t. to_string ( ) ) ;
1403+
1404+ let collector = collectors. entry ( tenant_id. clone ( ) ) . or_insert_with ( || {
1405+ BillingMetricsCollector :: new ( node_address. clone ( ) , node_type. clone ( ) , tenant_id)
1406+ } ) ;
1407+
1408+ process_sample ( collector, & sample, val) ;
13901409 }
13911410 }
13921411
1393- // Convert to flattened events, excluding empty collections
1394- collector. into_events ( )
1412+ // Convert all collectors to flattened events
1413+ collectors
1414+ . into_values ( )
1415+ . flat_map ( |c| c. into_events ( ) )
1416+ . collect ( )
13951417}
13961418
13971419/// Process a single prometheus sample and update the collector
0 commit comments