@@ -412,8 +412,19 @@ def test_agent_error_returns_500(self, client, monkeypatch):
412412 assert resp .status_code == 500
413413 assert "Agent processing failed" in resp .json ()["detail" ]
414414
415- def test_with_subscription_metadata (self , client ):
416- """Subscription field is used for user_id derivation."""
415+ def test_with_subscription_metadata (self , client , monkeypatch ):
416+ """Subscription field is sanitized for user_id (slashes replaced)."""
417+ captured_user_ids = []
418+
419+ async def dummy_run_async_capture (
420+ self , user_id , session_id , new_message , ** kwargs
421+ ):
422+ captured_user_ids .append (user_id )
423+ yield _model_event ("Success" )
424+ await asyncio .sleep (0 )
425+
426+ monkeypatch .setattr (Runner , "run_async" , dummy_run_async_capture )
427+
417428 message_data = base64 .b64encode (b"test" ).decode ("utf-8" )
418429 payload = {
419430 "message" : {"data" : message_data },
@@ -422,6 +433,32 @@ def test_with_subscription_metadata(self, client):
422433 resp = client .post ("/apps/test_app/trigger/pubsub" , json = payload )
423434
424435 assert resp .status_code == 200
436+ assert len (captured_user_ids ) == 1
437+ assert captured_user_ids [0 ] == "projects--p--subscriptions--orders-sub"
438+ assert "/" not in captured_user_ids [0 ]
439+
440+ def test_default_user_id_when_no_subscription (self , client , monkeypatch ):
441+ """Default user_id is used when subscription is absent."""
442+ captured_user_ids = []
443+
444+ async def dummy_run_async_capture (
445+ self , user_id , session_id , new_message , ** kwargs
446+ ):
447+ captured_user_ids .append (user_id )
448+ yield _model_event ("Success" )
449+ await asyncio .sleep (0 )
450+
451+ monkeypatch .setattr (Runner , "run_async" , dummy_run_async_capture )
452+
453+ message_data = base64 .b64encode (b"test" ).decode ("utf-8" )
454+ payload = {
455+ "message" : {"data" : message_data },
456+ }
457+ resp = client .post ("/apps/test_app/trigger/pubsub" , json = payload )
458+
459+ assert resp .status_code == 200
460+ assert len (captured_user_ids ) == 1
461+ assert captured_user_ids [0 ] == "pubsub-caller"
425462
426463 def test_unknown_app_fails_early (
427464 self , client , mock_agent_loader , mock_session_service
@@ -491,27 +528,87 @@ async def dummy_run_async_capture(
491528 == "google.cloud.storage.object.v1.finalized"
492529 )
493530
494- def test_source_derived_from_body (self , client ):
495- """Source from body is used for user_id."""
531+ def test_source_derived_from_body_sanitized (self , client , monkeypatch ):
532+ """Source from body is sanitized for user_id (slashes replaced)."""
533+ captured_user_ids = []
534+
535+ async def dummy_run_async_capture (
536+ self , user_id , session_id , new_message , ** kwargs
537+ ):
538+ captured_user_ids .append (user_id )
539+ yield _model_event ("Success" )
540+ await asyncio .sleep (0 )
541+
542+ monkeypatch .setattr (Runner , "run_async" , dummy_run_async_capture )
543+
496544 payload = {
497545 "data" : {"key" : "value" },
498- "source" : "my-custom-source " ,
546+ "source" : "//pubsub.googleapis.com/projects/p/topics/t " ,
499547 }
500548 resp = client .post ("/apps/test_app/trigger/eventarc" , json = payload )
501549
502550 assert resp .status_code == 200
551+ assert len (captured_user_ids ) == 1
552+ assert (
553+ captured_user_ids [0 ] == "pubsub.googleapis.com--projects--p--topics--t"
554+ )
555+ assert "/" not in captured_user_ids [0 ]
556+
557+ def test_source_from_ce_header_sanitized (self , client , monkeypatch ):
558+ """ce-source header is sanitized for user_id (slashes replaced)."""
559+ captured_user_ids = []
560+
561+ async def dummy_run_async_capture (
562+ self , user_id , session_id , new_message , ** kwargs
563+ ):
564+ captured_user_ids .append (user_id )
565+ yield _model_event ("Success" )
566+ await asyncio .sleep (0 )
567+
568+ monkeypatch .setattr (Runner , "run_async" , dummy_run_async_capture )
503569
504- def test_source_from_ce_header (self , client ):
505- """ce-source header is used when body source is absent."""
506570 payload = {
507571 "data" : {"key" : "value" },
508572 }
509573 resp = client .post (
510574 "/apps/test_app/trigger/eventarc" ,
511575 json = payload ,
512- headers = {"ce-source" : "header-source" },
576+ headers = {
577+ "ce-source" : (
578+ "//storage.googleapis.com/projects/_/buckets/my-bucket"
579+ ),
580+ },
513581 )
582+
583+ assert resp .status_code == 200
584+ assert len (captured_user_ids ) == 1
585+ assert (
586+ captured_user_ids [0 ]
587+ == "storage.googleapis.com--projects--_--buckets--my-bucket"
588+ )
589+ assert "/" not in captured_user_ids [0 ]
590+
591+ def test_default_user_id_when_no_source (self , client , monkeypatch ):
592+ """Default user_id is used when source is absent."""
593+ captured_user_ids = []
594+
595+ async def dummy_run_async_capture (
596+ self , user_id , session_id , new_message , ** kwargs
597+ ):
598+ captured_user_ids .append (user_id )
599+ yield _model_event ("Success" )
600+ await asyncio .sleep (0 )
601+
602+ monkeypatch .setattr (Runner , "run_async" , dummy_run_async_capture )
603+
604+ payload = {
605+ "data" : {"key" : "value" },
606+ }
607+ resp = client .post ("/apps/test_app/trigger/eventarc" , json = payload )
608+
514609 assert resp .status_code == 200
610+ assert len (captured_user_ids ) == 1
611+ assert captured_user_ids [0 ] == "eventarc-caller"
515612
516613 def test_complex_event_data (self , client , monkeypatch ):
517614 """Complex nested event data is serialized as JSON for the agent."""
0 commit comments