@@ -495,8 +495,9 @@ async fn insert_events_with_failure_and_gzip_compression() {
495495#[ tokio:: test]
496496async fn insert_events_in_data_stream ( ) {
497497 trace_init ( ) ;
498- let template_index = format ! ( "my-template-{}" , gen_index( ) ) ;
499- let stream_index = format ! ( "my-stream-{}" , gen_index( ) ) ;
498+ let index = gen_index ( ) ;
499+ let template_index = format ! ( "my-template-{}" , index) ;
500+ let stream_index = format ! ( "my-stream-{}" , index) ;
500501
501502 let cfg = ElasticsearchConfig {
502503 endpoints : vec ! [ http_server( ) ] ,
@@ -505,6 +506,10 @@ async fn insert_events_in_data_stream() {
505506 index : Template :: try_from ( stream_index. clone ( ) ) . expect ( "unable to parse template" ) ,
506507 ..Default :: default ( )
507508 } ,
509+ data_stream : Some ( DataStreamConfig {
510+ namespace : index,
511+ ..Default :: default ( )
512+ } ) ,
508513 batch : batch_settings ( ) ,
509514 ..Default :: default ( )
510515 } ;
@@ -635,14 +640,6 @@ async fn run_insert_tests_with_config(
635640 let common = ElasticsearchCommon :: parse_single ( config)
636641 . await
637642 . expect ( "Config error" ) ;
638- let index = match config. mode {
639- // Data stream mode uses an index name generated from the event.
640- ElasticsearchMode :: DataStream => format ! (
641- "{}" ,
642- Utc :: now( ) . format( ".ds-logs-generic-default-%Y.%m.%d-000001" )
643- ) ,
644- ElasticsearchMode :: Bulk => config. bulk . index . to_string ( ) ,
645- } ;
646643 let base_url = common. base_url . clone ( ) ;
647644
648645 let cx = SinkContext :: default ( ) ;
@@ -686,6 +683,15 @@ async fn run_insert_tests_with_config(
686683 // make sure writes are all visible
687684 flush ( common) . await . expect ( "Flushing writes failed" ) ;
688685
686+ let index = match config. mode {
687+ ElasticsearchMode :: DataStream => config
688+ . data_stream
689+ . as_ref ( )
690+ . map ( |ds| format ! ( "logs-generic-{}" , ds. namespace) )
691+ . unwrap ( ) ,
692+ ElasticsearchMode :: Bulk => config. bulk . index . to_string ( ) ,
693+ } ;
694+
689695 let client = create_http_client ( ) ;
690696 let mut response = client
691697 . get ( format ! ( "{base_url}/{index}/_search" ) )
0 commit comments