@@ -65,6 +65,7 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6565 private static final Logger log = LoggerFactory .getLogger (InformerEventSource .class );
6666 // we need direct control for the indexer to propagate the just update resource also to the index
6767 private final PrimaryToSecondaryMapper <P > primaryToSecondaryMapper ;
68+ private final ComplementaryPrimaryToSecondaryIndex <R > complementaryPrimaryToSecondaryIndex ;
6869 private final String id = UUID .randomUUID ().toString ();
6970
7071 public InformerEventSource (
@@ -98,13 +99,18 @@ private InformerEventSource(
9899 // If there is a primary to secondary mapper there is no need for primary to secondary index.
99100 primaryToSecondaryMapper = configuration .getPrimaryToSecondaryMapper ();
100101 if (primaryToSecondaryMapper == null ) {
102+ complementaryPrimaryToSecondaryIndex =
103+ new DefaultComplementaryPrimaryToSecondaryIndex <>(
104+ configuration .getSecondaryToPrimaryMapper ());
101105 addIndexers (
102106 Map .of (
103107 PRIMARY_TO_SECONDARY_INDEX_NAME ,
104108 (R r ) ->
105109 configuration .getSecondaryToPrimaryMapper ().toPrimaryResourceIDs (r ).stream ()
106110 .map (InformerEventSource ::resourceIdToString )
107111 .toList ()));
112+ } else {
113+ complementaryPrimaryToSecondaryIndex = new NOOPComplementaryPrimaryToSecondaryIndex ();
108114 }
109115
110116 final var informerConfig = configuration .getInformerConfig ();
@@ -123,7 +129,7 @@ public void onAdd(R newResource) {
123129 resourceType ().getSimpleName (),
124130 newResource .getMetadata ().getResourceVersion ());
125131 }
126-
132+ complementaryPrimaryToSecondaryIndex . cleanupForResource ( newResource );
127133 onAddOrUpdate (
128134 Operation .ADD , newResource , null , () -> InformerEventSource .super .onAdd (newResource ));
129135 }
@@ -250,28 +256,33 @@ private void propagateEvent(R object) {
250256 public Set <R > getSecondaryResources (P primary ) {
251257
252258 if (useSecondaryToPrimaryIndex ()) {
253-
254- var resources =
255- byIndex (
256- PRIMARY_TO_SECONDARY_INDEX_NAME ,
257- resourceIdToString (ResourceID .fromResource (primary )));
259+ var primaryID = ResourceID .fromResource (primary );
260+ var resources = byIndex (PRIMARY_TO_SECONDARY_INDEX_NAME , resourceIdToString (primaryID ));
258261
259262 log .debug (
260263 "Using informer primary to secondary index to find secondary resources for primary name:"
261264 + " {} namespace: {}. Found {}" ,
262265 primary .getMetadata ().getName (),
263266 primary .getMetadata ().getNamespace (),
264267 resources .size ());
265-
266- return resources .stream ()
267- .map (
268- r -> {
269- Optional <R > resource =
270- temporaryResourceCache .getResourceFromCache (ResourceID .fromResource (r ));
271- return resource .orElse (r );
272- })
273- .collect (Collectors .toSet ());
274-
268+ var complementaryIds =
269+ complementaryPrimaryToSecondaryIndex .getComplementarySecondaryResources (primaryID );
270+ var res =
271+ resources .stream ()
272+ .map (
273+ r -> {
274+ var resourceId = ResourceID .fromResource (r );
275+ Optional <R > resource = temporaryResourceCache .getResourceFromCache (resourceId );
276+ complementaryIds .remove (resourceId );
277+ return resource .orElse (r );
278+ })
279+ .collect (Collectors .toSet ());
280+ complementaryIds .forEach (
281+ id -> {
282+ Optional <R > resource = temporaryResourceCache .getResourceFromCache (id );
283+ resource .ifPresent (res ::add );
284+ });
285+ return res ;
275286 } else {
276287 Set <ResourceID > secondaryIDs = primaryToSecondaryMapper .toSecondaryResourceIDs (primary );
277288 log .debug (
@@ -298,8 +309,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
298309 }
299310
300311 private void handleRecentCreateOrUpdate (Operation operation , R newResource , R oldResource ) {
301- // todo
302- // primaryToSecondaryIndex.onAddOrUpdate(newResource);
312+ if (operation == Operation .ADD ) {
313+ complementaryPrimaryToSecondaryIndex .explicitAdd (newResource );
314+ }
303315 temporaryResourceCache .putResource (
304316 newResource ,
305317 Optional .ofNullable (oldResource )
0 commit comments