@@ -238,6 +238,261 @@ def test_normalize_outputs_invalid_type(self, mock_session):
238238 processor ._normalize_outputs (["invalid" ])
239239
240240
241+
242+
243+ class TestBugConditionFileUriReplacedInLocalMode :
244+ """Bug condition exploration test: file:// URIs should be preserved in local mode.
245+
246+ **Validates: Requirements 1.1, 1.2, 2.1, 2.2**
247+
248+ EXPECTED TO FAIL on unfixed code — failure confirms the bug exists.
249+ The bug is that _normalize_outputs() replaces file:// URIs with s3:// paths
250+ even when the session is a LocalSession (local_mode=True).
251+ """
252+
253+ @pytest .fixture
254+ def local_mock_session (self ):
255+ session = Mock ()
256+ session .boto_session = Mock ()
257+ session .boto_session .region_name = "us-west-2"
258+ session .sagemaker_client = Mock ()
259+ session .default_bucket = Mock (return_value = "default-bucket" )
260+ session .default_bucket_prefix = "prefix"
261+ session .expand_role = Mock (side_effect = lambda x : x )
262+ session .sagemaker_config = {}
263+ session .local_mode = True
264+ return session
265+
266+ @pytest .mark .parametrize (
267+ "file_uri" ,
268+ [
269+ "file:///tmp/output" ,
270+ "file:///home/user/results" ,
271+ "file:///data/processed" ,
272+ ],
273+ )
274+ def test_normalize_outputs_preserves_file_uri_in_local_mode (self , local_mock_session , file_uri ):
275+ """file:// URIs must be preserved when local_mode=True.
276+
277+ On unfixed code, _normalize_outputs replaces file:// URIs with
278+ s3://default-bucket/prefix/job-name/output/output-1, which is the bug.
279+ """
280+ processor = Processor (
281+ role = "arn:aws:iam::123456789012:role/SageMakerRole" ,
282+ image_uri = "test-image:latest" ,
283+ instance_count = 1 ,
284+ instance_type = "ml.m5.xlarge" ,
285+ sagemaker_session = local_mock_session ,
286+ )
287+ processor ._current_job_name = "test-job"
288+
289+ s3_output = ProcessingS3Output (
290+ s3_uri = file_uri ,
291+ local_path = "/opt/ml/processing/output" ,
292+ s3_upload_mode = "EndOfJob" ,
293+ )
294+ outputs = [ProcessingOutput (output_name = "my-output" , s3_output = s3_output )]
295+
296+ with patch ("sagemaker.core.workflow.utilities._pipeline_config" , None ):
297+ result = processor ._normalize_outputs (outputs )
298+
299+ assert len (result ) == 1
300+ assert result [0 ].s3_output .s3_uri == file_uri , (
301+ f"Expected file:// URI to be preserved as '{ file_uri } ' in local mode, "
302+ f"but got '{ result [0 ].s3_output .s3_uri } '"
303+ )
304+
305+
306+ class TestPreservationNonLocalFileBehavior :
307+ """Preservation property tests: Non-local-file behavior must remain unchanged.
308+
309+ **Validates: Requirements 3.1, 3.2, 3.3, 3.4**
310+
311+ These tests capture baseline behavior on UNFIXED code. They MUST PASS on both
312+ unfixed and fixed code, confirming no regressions are introduced by the fix.
313+ """
314+
315+ @pytest .fixture
316+ def session_local_mode_true (self ):
317+ session = Mock ()
318+ session .boto_session = Mock ()
319+ session .boto_session .region_name = "us-west-2"
320+ session .sagemaker_client = Mock ()
321+ session .default_bucket = Mock (return_value = "default-bucket" )
322+ session .default_bucket_prefix = "prefix"
323+ session .expand_role = Mock (side_effect = lambda x : x )
324+ session .sagemaker_config = {}
325+ session .local_mode = True
326+ return session
327+
328+ @pytest .fixture
329+ def session_local_mode_false (self ):
330+ session = Mock ()
331+ session .boto_session = Mock ()
332+ session .boto_session .region_name = "us-west-2"
333+ session .sagemaker_client = Mock ()
334+ session .default_bucket = Mock (return_value = "default-bucket" )
335+ session .default_bucket_prefix = "prefix"
336+ session .expand_role = Mock (side_effect = lambda x : x )
337+ session .sagemaker_config = {}
338+ session .local_mode = False
339+ return session
340+
341+ def _make_processor (self , session ):
342+ processor = Processor (
343+ role = "arn:aws:iam::123456789012:role/SageMakerRole" ,
344+ image_uri = "test-image:latest" ,
345+ instance_count = 1 ,
346+ instance_type = "ml.m5.xlarge" ,
347+ sagemaker_session = session ,
348+ )
349+ processor ._current_job_name = "test-job"
350+ return processor
351+
352+ # --- Requirement 3.1: S3 URIs pass through unchanged regardless of local_mode ---
353+
354+ @pytest .mark .parametrize (
355+ "s3_uri,local_mode_fixture" ,
356+ [
357+ ("s3://my-bucket/path" , "session_local_mode_true" ),
358+ ("s3://my-bucket/path" , "session_local_mode_false" ),
359+ ("s3://another-bucket/deep/nested/path" , "session_local_mode_true" ),
360+ ("s3://another-bucket/deep/nested/path" , "session_local_mode_false" ),
361+ ],
362+ )
363+ def test_s3_uri_preserved_regardless_of_local_mode (self , s3_uri , local_mode_fixture , request ):
364+ """S3 URIs must pass through unchanged regardless of local_mode setting.
365+
366+ **Validates: Requirements 3.1**
367+ """
368+ session = request .getfixturevalue (local_mode_fixture )
369+ processor = self ._make_processor (session )
370+
371+ s3_output = ProcessingS3Output (
372+ s3_uri = s3_uri ,
373+ local_path = "/opt/ml/processing/output" ,
374+ s3_upload_mode = "EndOfJob" ,
375+ )
376+ outputs = [ProcessingOutput (output_name = "my-output" , s3_output = s3_output )]
377+
378+ with patch ("sagemaker.core.workflow.utilities._pipeline_config" , None ):
379+ result = processor ._normalize_outputs (outputs )
380+
381+ assert len (result ) == 1
382+ assert result [0 ].s3_output .s3_uri == s3_uri
383+
384+ # --- Requirement 3.2: Non-S3 URIs with local_mode=False replaced with S3 paths ---
385+
386+ @pytest .mark .parametrize (
387+ "non_s3_uri" ,
388+ [
389+ "/local/output/path" ,
390+ "http://example.com/output" ,
391+ "ftp://server/output" ,
392+ ],
393+ )
394+ def test_non_s3_uri_replaced_when_not_local_mode (self , non_s3_uri , session_local_mode_false ):
395+ """Non-S3 URIs in non-local sessions are replaced with auto-generated S3 paths.
396+
397+ **Validates: Requirements 3.2**
398+ """
399+ processor = self ._make_processor (session_local_mode_false )
400+
401+ s3_output = ProcessingS3Output (
402+ s3_uri = non_s3_uri ,
403+ local_path = "/opt/ml/processing/output" ,
404+ s3_upload_mode = "EndOfJob" ,
405+ )
406+ outputs = [ProcessingOutput (output_name = "output-1" , s3_output = s3_output )]
407+
408+ with patch ("sagemaker.core.workflow.utilities._pipeline_config" , None ):
409+ result = processor ._normalize_outputs (outputs )
410+
411+ assert len (result ) == 1
412+ assert result [0 ].s3_output .s3_uri .startswith ("s3://default-bucket/" )
413+
414+ # --- Requirement 3.3: Pipeline variable URIs skip normalization ---
415+
416+ def test_pipeline_variable_uri_skips_normalization (self , session_local_mode_false ):
417+ """Pipeline variable URIs skip normalization entirely.
418+
419+ **Validates: Requirements 3.3**
420+ """
421+ processor = self ._make_processor (session_local_mode_false )
422+
423+ s3_output = ProcessingS3Output (
424+ s3_uri = "s3://bucket/output" ,
425+ local_path = "/opt/ml/processing/output" ,
426+ s3_upload_mode = "EndOfJob" ,
427+ )
428+ outputs = [ProcessingOutput (output_name = "output-1" , s3_output = s3_output )]
429+
430+ with patch ("sagemaker.core.processing.is_pipeline_variable" , return_value = True ):
431+ result = processor ._normalize_outputs (outputs )
432+
433+ assert len (result ) == 1
434+ # Pipeline variable outputs are appended as-is without URI modification
435+ assert result [0 ].s3_output .s3_uri == "s3://bucket/output"
436+
437+ # --- Requirement 3.4: Non-ProcessingOutput objects raise TypeError ---
438+
439+ @pytest .mark .parametrize (
440+ "invalid_output" ,
441+ [
442+ ["a string" ],
443+ [42 ],
444+ [{"key" : "value" }],
445+ ],
446+ )
447+ def test_non_processing_output_raises_type_error (self , invalid_output , session_local_mode_false ):
448+ """Non-ProcessingOutput objects must raise TypeError.
449+
450+ **Validates: Requirements 3.4**
451+ """
452+ processor = self ._make_processor (session_local_mode_false )
453+
454+ with pytest .raises (TypeError , match = "must be provided as ProcessingOutput objects" ):
455+ processor ._normalize_outputs (invalid_output )
456+
457+ # --- Output name auto-generation ---
458+
459+ def test_multiple_outputs_with_s3_uris_preserved (self , session_local_mode_false ):
460+ """Multiple outputs with S3 URIs are all preserved unchanged.
461+
462+ **Validates: Requirements 3.1, 3.2**
463+ """
464+ processor = self ._make_processor (session_local_mode_false )
465+
466+ outputs = [
467+ ProcessingOutput (
468+ output_name = "first-output" ,
469+ s3_output = ProcessingS3Output (
470+ s3_uri = "s3://my-bucket/first" ,
471+ local_path = "/opt/ml/processing/output1" ,
472+ s3_upload_mode = "EndOfJob" ,
473+ ),
474+ ),
475+ ProcessingOutput (
476+ output_name = "second-output" ,
477+ s3_output = ProcessingS3Output (
478+ s3_uri = "s3://my-bucket/second" ,
479+ local_path = "/opt/ml/processing/output2" ,
480+ s3_upload_mode = "EndOfJob" ,
481+ ),
482+ ),
483+ ]
484+
485+ with patch ("sagemaker.core.workflow.utilities._pipeline_config" , None ):
486+ result = processor ._normalize_outputs (outputs )
487+
488+ assert len (result ) == 2
489+ assert result [0 ].output_name == "first-output"
490+ assert result [1 ].output_name == "second-output"
491+ # S3 URIs should be preserved since they already have s3:// scheme
492+ assert result [0 ].s3_output .s3_uri == "s3://my-bucket/first"
493+ assert result [1 ].s3_output .s3_uri == "s3://my-bucket/second"
494+
495+
241496class TestProcessorStartNew :
242497 def test_start_new_with_pipeline_session (self , mock_session ):
243498 from sagemaker .core .workflow .pipeline_context import PipelineSession
0 commit comments