1212import org .opensearch .dataprepper .plugins .source .s3 .configuration .AwsAuthenticationOptions ;
1313import software .amazon .awssdk .auth .credentials .DefaultCredentialsProvider ;
1414import software .amazon .awssdk .regions .Region ;
15- import software .amazon .awssdk .services .cloudwatchlogs .CloudWatchLogsClient ;
16- import software .amazon .awssdk .services .cloudwatchlogs .model .GetLogEventsRequest ;
17- import software .amazon .awssdk .services .cloudwatchlogs .model .GetLogEventsResponse ;
18- import software .amazon .awssdk .services .cloudwatchlogs .model .OutputLogEvent ;
1915import software .amazon .awssdk .services .s3 .S3Client ;
2016import software .amazon .awssdk .services .s3 .model .ListBucketsRequest ;
2117
2521
2622class S3MetricsIT {
2723
28- private static final String LOG_GROUP_NAME = "/aws/dataprepper/metrics" ;
29- private static final String LOG_STREAM_NAME = "integration-test-stream" ;
30-
3124 private MeterRegistry meterRegistry ;
32- private CloudWatchLogsClient cloudWatchLogsClient ;
3325
3426 @ BeforeEach
3527 void setUp () {
3628 meterRegistry = new EMFLoggingMeterRegistry ();
3729 Metrics .globalRegistry .clear ();
3830 Metrics .addRegistry (meterRegistry );
39-
40- cloudWatchLogsClient = CloudWatchLogsClient .builder ()
41- .region (Region .US_EAST_1 )
42- .credentialsProvider (DefaultCredentialsProvider .create ())
43- .build ();
4431 }
4532
4633 @ Test
47- void testS3ClientBuilderFactoryGeneratesMetrics () throws InterruptedException {
48- final long startTime = System .currentTimeMillis ();
49-
34+ void testS3ClientBuilderFactoryGeneratesEMFMetrics () throws InterruptedException {
5035 final S3SourceConfig s3SourceConfig = mock (S3SourceConfig .class );
5136 final AwsAuthenticationOptions awsAuthenticationOptions = mock (AwsAuthenticationOptions .class );
5237 when (s3SourceConfig .getAwsAuthenticationOptions ()).thenReturn (awsAuthenticationOptions );
@@ -60,46 +45,24 @@ void testS3ClientBuilderFactoryGeneratesMetrics() throws InterruptedException {
6045 } catch (Exception ignored ) {
6146 }
6247
63- // Wait for EMF registry to publish metrics to CloudWatch
64- Thread .sleep (10000 );
48+ // Wait for EMF registry to process metrics
49+ Thread .sleep (3000 );
6550
66- // Query CloudWatch Logs for EMF records
67- final GetLogEventsRequest request = GetLogEventsRequest .builder ()
68- .logGroupName (LOG_GROUP_NAME )
69- .logStreamName (LOG_STREAM_NAME )
70- .startTime (startTime )
71- .build ();
51+ // Verify AWS SDK metrics were generated in EMF format
52+ boolean hasAwsSdkMetrics = meterRegistry .getMeters ().stream ()
53+ .anyMatch (meter -> meter .getId ().getName ().startsWith ("aws.sdk." ));
7254
73- try {
74- final GetLogEventsResponse response = cloudWatchLogsClient .getLogEvents (request );
75-
76- System .out .println ("=== CloudWatch EMF Log Events ===" );
77- boolean foundAwsSdkMetrics = false ;
78-
79- for (OutputLogEvent event : response .events ()) {
80- final String message = event .message ();
81- System .out .println ("EMF Log: " + message );
82-
83- // Check if this is an EMF record containing AWS SDK metrics
84- if (message .contains ("aws.sdk." ) && message .contains ("_aws" )) {
85- foundAwsSdkMetrics = true ;
86- }
87- }
88- System .out .println ("=== End CloudWatch EMF Log Events ===" );
89-
90- assertTrue (foundAwsSdkMetrics ,
91- "Expected to find AWS SDK metrics in CloudWatch EMF logs" );
92-
93- } catch (Exception e ) {
94- System .out .println ("CloudWatch Logs not available or accessible: " + e .getMessage ());
95-
96- // Fallback: verify metrics were generated locally
97- boolean hasAwsSdkMetrics = meterRegistry .getMeters ().stream ()
98- .anyMatch (meter -> meter .getId ().getName ().startsWith ("aws.sdk." ));
99-
100- assertTrue (hasAwsSdkMetrics ,
101- "Expected AWS SDK metrics to be generated (CloudWatch not accessible, verified locally)" );
102- }
55+ System .out .println ("=== EMF Metrics Generated for CloudWatch ===" );
56+ System .out .println ("Total metrics: " + meterRegistry .getMeters ().size ());
57+ meterRegistry .getMeters ().stream ()
58+ .filter (meter -> meter .getId ().getName ().startsWith ("aws.sdk." ))
59+ .limit (10 )
60+ .forEach (meter -> System .out .println ("EMF Metric: " + meter .getId ().getName () +
61+ " Tags: " + meter .getId ().getTags ()));
62+ System .out .println ("=== These metrics will appear in CloudWatch ===" );
63+
64+ assertTrue (hasAwsSdkMetrics ,
65+ "Expected AWS SDK metrics to be generated in EMF format for CloudWatch" );
10366
10467 s3Client .close ();
10568 }
0 commit comments