@@ -65,10 +65,25 @@ async fn sets_create_action_when_configured() {
6565 assert_eq ! ( encoded. len( ) , encoded_size) ;
6666}
6767
68- fn data_stream_body ( ) -> BTreeMap < String , Value > {
68+ fn data_stream_body (
69+ dtype : Option < String > ,
70+ dataset : Option < String > ,
71+ namespace : Option < String > ,
72+ ) -> BTreeMap < String , Value > {
6973 let mut ds = BTreeMap :: < String , Value > :: new ( ) ;
70- ds. insert ( "type" . into ( ) , Value :: from ( "synthetics" ) ) ;
71- ds. insert ( "dataset" . into ( ) , Value :: from ( "testing" ) ) ;
74+
75+ if let Some ( dtype) = dtype {
76+ ds. insert ( "type" . into ( ) , Value :: from ( dtype) ) ;
77+ }
78+
79+ if let Some ( dataset) = dataset {
80+ ds. insert ( "dataset" . into ( ) , Value :: from ( dataset) ) ;
81+ }
82+
83+ if let Some ( namespace) = namespace {
84+ ds. insert ( "namespace" . into ( ) , Value :: from ( namespace) ) ;
85+ }
86+
7287 ds
7388}
7489
@@ -100,7 +115,14 @@ async fn encode_datastream_mode() {
100115 . single ( )
101116 . expect ( "invalid timestamp" ) ,
102117 ) ;
103- log. insert ( "data_stream" , data_stream_body ( ) ) ;
118+ log. insert (
119+ "data_stream" ,
120+ data_stream_body (
121+ Some ( "synthetics" . to_string ( ) ) ,
122+ Some ( "testing" . to_string ( ) ) ,
123+ None ,
124+ ) ,
125+ ) ;
104126
105127 let mut encoded = vec ! [ ] ;
106128 let ( encoded_size, _json_size) = es
@@ -143,7 +165,14 @@ async fn encode_datastream_mode_no_routing() {
143165 let es = ElasticsearchCommon :: parse_single ( & config) . await . unwrap ( ) ;
144166
145167 let mut log = LogEvent :: from ( "hello there" ) ;
146- log. insert ( "data_stream" , data_stream_body ( ) ) ;
168+ log. insert (
169+ "data_stream" ,
170+ data_stream_body (
171+ Some ( "synthetics" . to_string ( ) ) ,
172+ Some ( "testing" . to_string ( ) ) ,
173+ None ,
174+ ) ,
175+ ) ;
147176 log. insert (
148177 (
149178 lookup:: PathPrefix :: Event ,
@@ -287,7 +316,14 @@ async fn encode_datastream_mode_no_sync() {
287316 let es = ElasticsearchCommon :: parse_single ( & config) . await . unwrap ( ) ;
288317
289318 let mut log = LogEvent :: from ( "hello there" ) ;
290- log. insert ( "data_stream" , data_stream_body ( ) ) ;
319+ log. insert (
320+ "data_stream" ,
321+ data_stream_body (
322+ Some ( "synthetics" . to_string ( ) ) ,
323+ Some ( "testing" . to_string ( ) ) ,
324+ None ,
325+ ) ,
326+ ) ;
291327 log. insert (
292328 (
293329 lookup:: PathPrefix :: Event ,
@@ -389,3 +425,122 @@ async fn allows_using_only_fields() {
389425 assert_eq ! ( std:: str :: from_utf8( & encoded) . unwrap( ) , expected) ;
390426 assert_eq ! ( encoded. len( ) , encoded_size) ;
391427}
428+
429+ #[ tokio:: test]
430+ async fn datastream_index_name ( ) {
431+ #[ derive( Clone , Debug ) ]
432+ struct TestCase {
433+ dtype : Option < String > ,
434+ namespace : Option < String > ,
435+ dataset : Option < String > ,
436+ want : String ,
437+ }
438+
439+ let config = ElasticsearchConfig {
440+ bulk : BulkConfig {
441+ index : parse_template ( "vector" ) ,
442+ ..Default :: default ( )
443+ } ,
444+ endpoints : vec ! [ String :: from( "https://example.com" ) ] ,
445+ mode : ElasticsearchMode :: DataStream ,
446+ api_version : ElasticsearchApiVersion :: V6 ,
447+ ..Default :: default ( )
448+ } ;
449+ let es = ElasticsearchCommon :: parse_single ( & config) . await . unwrap ( ) ;
450+
451+ let test_cases = [
452+ TestCase {
453+ dtype : Some ( "type" . to_string ( ) ) ,
454+ dataset : Some ( "dataset" . to_string ( ) ) ,
455+ namespace : Some ( "namespace" . to_string ( ) ) ,
456+ want : "type-dataset-namespace" . to_string ( ) ,
457+ } ,
458+ TestCase {
459+ dtype : Some ( "type" . to_string ( ) ) ,
460+ dataset : Some ( "" . to_string ( ) ) ,
461+ namespace : Some ( "namespace" . to_string ( ) ) ,
462+ want : "type-namespace" . to_string ( ) ,
463+ } ,
464+ TestCase {
465+ dtype : Some ( "type" . to_string ( ) ) ,
466+ dataset : None ,
467+ namespace : Some ( "namespace" . to_string ( ) ) ,
468+ want : "type-generic-namespace" . to_string ( ) ,
469+ } ,
470+ TestCase {
471+ dtype : Some ( "type" . to_string ( ) ) ,
472+ dataset : Some ( "" . to_string ( ) ) ,
473+ namespace : Some ( "" . to_string ( ) ) ,
474+ want : "type" . to_string ( ) ,
475+ } ,
476+ TestCase {
477+ dtype : Some ( "type" . to_string ( ) ) ,
478+ dataset : None ,
479+ namespace : None ,
480+ want : "type-generic-default" . to_string ( ) ,
481+ } ,
482+ TestCase {
483+ dtype : Some ( "" . to_string ( ) ) ,
484+ dataset : Some ( "" . to_string ( ) ) ,
485+ namespace : Some ( "" . to_string ( ) ) ,
486+ want : "" . to_string ( ) ,
487+ } ,
488+ TestCase {
489+ dtype : None ,
490+ dataset : None ,
491+ namespace : None ,
492+ want : "logs-generic-default" . to_string ( ) ,
493+ } ,
494+ TestCase {
495+ dtype : Some ( "" . to_string ( ) ) ,
496+ dataset : Some ( "dataset" . to_string ( ) ) ,
497+ namespace : Some ( "namespace" . to_string ( ) ) ,
498+ want : "dataset-namespace" . to_string ( ) ,
499+ } ,
500+ TestCase {
501+ dtype : None ,
502+ dataset : Some ( "dataset" . to_string ( ) ) ,
503+ namespace : Some ( "namespace" . to_string ( ) ) ,
504+ want : "logs-dataset-namespace" . to_string ( ) ,
505+ } ,
506+ TestCase {
507+ dtype : Some ( "" . to_string ( ) ) ,
508+ dataset : Some ( "" . to_string ( ) ) ,
509+ namespace : Some ( "namespace" . to_string ( ) ) ,
510+ want : "namespace" . to_string ( ) ,
511+ } ,
512+ TestCase {
513+ dtype : None ,
514+ dataset : None ,
515+ namespace : Some ( "namespace" . to_string ( ) ) ,
516+ want : "logs-generic-namespace" . to_string ( ) ,
517+ } ,
518+ TestCase {
519+ dtype : Some ( "" . to_string ( ) ) ,
520+ dataset : Some ( "dataset" . to_string ( ) ) ,
521+ namespace : Some ( "" . to_string ( ) ) ,
522+ want : "dataset" . to_string ( ) ,
523+ } ,
524+ TestCase {
525+ dtype : None ,
526+ dataset : Some ( "dataset" . to_string ( ) ) ,
527+ namespace : None ,
528+ want : "logs-dataset-default" . to_string ( ) ,
529+ } ,
530+ ] ;
531+
532+ for test_case in test_cases {
533+ let mut log = LogEvent :: from ( "hello there" ) ;
534+ log. insert (
535+ "data_stream" ,
536+ data_stream_body (
537+ test_case. dtype . clone ( ) ,
538+ test_case. dataset . clone ( ) ,
539+ test_case. namespace . clone ( ) ,
540+ ) ,
541+ ) ;
542+
543+ let processed_event = process_log ( log, & es. mode , & None , & config. encoding ) . unwrap ( ) ;
544+ assert_eq ! ( processed_event. index, test_case. want, "{test_case:?}" ) ;
545+ }
546+ }
0 commit comments