@@ -752,6 +752,237 @@ def fake_stream(func, **kw):
752752 # list_func called twice: initial list + re-list after 410.
753753 self .assertEqual (list_func .call_count , 2 )
754754
755+ # ------------------------------------------------------------------
756+ # Tests analogous to client-go shared_informer_test.go scenarios.
757+ # ------------------------------------------------------------------
758+
759+ def test_same_handler_registered_twice_fires_twice (self ):
760+ """Registering the same callable twice is two independent registrations.
761+
762+ Analogous to Go TestSharedInformerMultipleRegistration: the same
763+ handler callable can be added twice, fires twice per event, and
764+ removing one registration leaves the other active.
765+ """
766+ pod = _make_pod ("default" , "dup-pod" )
767+ received = []
768+
769+ list_func = MagicMock ()
770+ list_resp = MagicMock ()
771+ list_resp .items = []
772+ list_resp .metadata = MagicMock (resource_version = "1" )
773+ list_func .return_value = list_resp
774+
775+ informer = SharedInformer (list_func = list_func )
776+ handler = received .append
777+ informer .add_event_handler (ADDED , handler )
778+ informer .add_event_handler (ADDED , handler ) # same callable, second registration
779+
780+ with patch ("kubernetes.informer.informer.Watch" ) as MockWatch :
781+ mock_w = MagicMock ()
782+
783+ def fake_stream (func , ** kw ):
784+ yield {"type" : "ADDED" , "object" : pod }
785+ informer ._stop_event .set ()
786+
787+ mock_w .stream .side_effect = fake_stream
788+ MockWatch .return_value = mock_w
789+
790+ informer .start ()
791+ informer ._thread .join (timeout = 3 )
792+
793+ # Two registrations → two calls
794+ self .assertEqual (received .count (pod ), 2 )
795+
796+ # After removing one registration the other still works.
797+ informer .remove_event_handler (ADDED , handler )
798+ received .clear ()
799+ informer ._fire (ADDED , pod )
800+ self .assertEqual (received .count (pod ), 1 )
801+
802+ def test_remove_handler_while_running_stops_events (self ):
803+ """Removing a handler mid-run stops it receiving subsequent events.
804+
805+ Analogous to Go TestRemoveWhileActive.
806+ """
807+ pod1 = _make_pod ("default" , "pod1" )
808+ pod2 = _make_pod ("default" , "pod2" )
809+ received = []
810+
811+ list_func = MagicMock ()
812+ list_resp = MagicMock ()
813+ list_resp .items = []
814+ list_resp .metadata = MagicMock (resource_version = "1" )
815+ list_func .return_value = list_resp
816+
817+ # pod1_seen is set by the handler after pod1 is processed.
818+ pod1_seen = threading .Event ()
819+ # can_send_pod2 is set by the test thread to allow pod2 to be yielded.
820+ can_send_pod2 = threading .Event ()
821+
822+ informer = SharedInformer (list_func = list_func )
823+
824+ def handler (obj ):
825+ received .append (obj )
826+ if obj is pod1 :
827+ pod1_seen .set ()
828+
829+ informer .add_event_handler (ADDED , handler )
830+
831+ with patch ("kubernetes.informer.informer.Watch" ) as MockWatch :
832+ mock_w = MagicMock ()
833+
834+ def fake_stream (func , ** kw ):
835+ yield {"type" : "ADDED" , "object" : pod1 }
836+ # Block until the test thread has removed the handler.
837+ can_send_pod2 .wait (timeout = 5 )
838+ yield {"type" : "ADDED" , "object" : pod2 }
839+ informer ._stop_event .set ()
840+
841+ mock_w .stream .side_effect = fake_stream
842+ MockWatch .return_value = mock_w
843+
844+ informer .start ()
845+ # Wait until pod1 has been processed, then remove the handler.
846+ pod1_seen .wait (timeout = 3 )
847+ informer .remove_event_handler (ADDED , handler )
848+ can_send_pod2 .set ()
849+ informer ._thread .join (timeout = 3 )
850+
851+ self .assertIn (pod1 , received )
852+ self .assertNotIn (pod2 , received )
853+
854+ def test_add_handler_while_running_receives_subsequent_events (self ):
855+ """Adding a handler while the informer is running fires it for subsequent events.
856+
857+ Analogous to Go TestAddWhileActive.
858+ """
859+ pod1 = _make_pod ("default" , "pod1-aw" )
860+ pod2 = _make_pod ("default" , "pod2-aw" )
861+ received1 = []
862+ received2 = []
863+
864+ list_func = MagicMock ()
865+ list_resp = MagicMock ()
866+ list_resp .items = []
867+ list_resp .metadata = MagicMock (resource_version = "1" )
868+ list_func .return_value = list_resp
869+
870+ pod1_processed = threading .Event ()
871+ can_send_pod2 = threading .Event ()
872+
873+ informer = SharedInformer (list_func = list_func )
874+ informer .add_event_handler (ADDED , received1 .append )
875+
876+ with patch ("kubernetes.informer.informer.Watch" ) as MockWatch :
877+ mock_w = MagicMock ()
878+
879+ def fake_stream (func , ** kw ):
880+ yield {"type" : "ADDED" , "object" : pod1 }
881+ # Signal that pod1 has been yielded and processed.
882+ pod1_processed .set ()
883+ # Wait until the test thread registers handler2.
884+ can_send_pod2 .wait (timeout = 5 )
885+ yield {"type" : "ADDED" , "object" : pod2 }
886+ informer ._stop_event .set ()
887+
888+ mock_w .stream .side_effect = fake_stream
889+ MockWatch .return_value = mock_w
890+
891+ informer .start ()
892+ # After pod1 is processed, register the second handler.
893+ pod1_processed .wait (timeout = 3 )
894+ informer .add_event_handler (ADDED , received2 .append )
895+ can_send_pod2 .set ()
896+ informer ._thread .join (timeout = 3 )
897+
898+ # handler1 sees both pods; handler2 (added late) only sees pod2.
899+ self .assertIn (pod1 , received1 )
900+ self .assertIn (pod2 , received1 )
901+ self .assertNotIn (pod1 , received2 )
902+ self .assertIn (pod2 , received2 )
903+
904+ def test_concurrent_handler_registration_is_thread_safe (self ):
905+ """Concurrent add/remove of handlers from many threads must not raise.
906+
907+ Analogous to Go TestSharedInformerHandlerAbuse (thread safety portion).
908+ """
909+ list_func = MagicMock ()
910+ list_resp = MagicMock ()
911+ list_resp .items = []
912+ list_resp .metadata = MagicMock (resource_version = "1" )
913+ list_func .return_value = list_resp
914+
915+ informer = SharedInformer (list_func = list_func )
916+ errors = []
917+
918+ def worker ():
919+ try :
920+ for _ in range (30 ):
921+ fn = lambda obj : None # noqa: E731
922+ informer .add_event_handler (ADDED , fn )
923+ informer .add_event_handler (MODIFIED , fn )
924+ informer .remove_event_handler (ADDED , fn )
925+ informer .remove_event_handler (MODIFIED , fn )
926+ except Exception as exc :
927+ errors .append (exc )
928+
929+ threads = [threading .Thread (target = worker ) for _ in range (10 )]
930+ for t in threads :
931+ t .start ()
932+ for t in threads :
933+ t .join ()
934+
935+ self .assertEqual (errors , [])
936+
937+ def test_watch_disruption_existing_items_fire_modified_after_relist (self ):
938+ """After a 410-triggered re-list, items in both old and new list fire MODIFIED.
939+
940+ Analogous to Go TestSharedInformerWatchDisruption: when a watch is
941+ disrupted and the re-list returns the same objects (possibly with
942+ updates), listeners receive MODIFIED for them.
943+ """
944+ from kubernetes .client .exceptions import ApiException
945+
946+ pod = _make_pod ("default" , "stable-pod" )
947+
948+ list_call = {"n" : 0 }
949+
950+ def list_func (** kw ):
951+ list_call ["n" ] += 1
952+ resp = MagicMock ()
953+ resp .items = [pod ] # pod present in both lists
954+ resp .metadata = MagicMock (resource_version = str (list_call ["n" ]))
955+ return resp
956+
957+ modified = []
958+ informer = SharedInformer (list_func = list_func )
959+ informer .add_event_handler (MODIFIED , modified .append )
960+
961+ stream_calls = {"n" : 0 }
962+
963+ with patch ("kubernetes.informer.informer.Watch" ) as MockWatch :
964+ mock_w = MagicMock ()
965+ mock_w .resource_version = "1"
966+
967+ def fake_stream (func , ** kw ):
968+ stream_calls ["n" ] += 1
969+ if stream_calls ["n" ] == 1 :
970+ raise ApiException (status = 410 , reason = "Gone" )
971+ informer ._stop_event .set ()
972+ return iter ([])
973+
974+ mock_w .stream .side_effect = fake_stream
975+ MockWatch .return_value = mock_w
976+
977+ informer .start ()
978+ informer ._thread .join (timeout = 3 )
979+
980+ # pod was in both the initial list (call 1) and the re-list (call 2).
981+ # On the re-list it should fire MODIFIED (not ADDED again).
982+ self .assertIn (pod , modified )
983+ # Still in cache.
984+ self .assertIsNotNone (informer .cache .get_by_key ("default/stable-pod" ))
985+
755986
756987if __name__ == "__main__" :
757988 unittest .main ()
0 commit comments