11package io .javaoperatorsdk .operator .processing .event .source .informer ;
22
3- import java .util .Map ;
43import java .util .Optional ;
54import java .util .Set ;
65import java .util .UUID ;
@@ -59,13 +58,11 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
5958 extends ManagedInformerEventSource <R , P , InformerEventSourceConfiguration <R >>
6059 implements ResourceEventHandler <R > {
6160
62- public static final String PRIMARY_TO_SECONDARY_INDEX_NAME = "primaryToSecondary" ;
63-
6461 public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous" ;
6562 private static final Logger log = LoggerFactory .getLogger (InformerEventSource .class );
6663 // we need direct control for the indexer to propagate the just update resource also to the index
64+ private final PrimaryToSecondaryIndex <R > primaryToSecondaryIndex ;
6765 private final PrimaryToSecondaryMapper <P > primaryToSecondaryMapper ;
68- private final TemporalPrimaryToSecondaryIndex <R > temporalPrimaryToSecondaryIndex ;
6966 private final String id = UUID .randomUUID ().toString ();
7067
7168 public InformerEventSource (
@@ -99,17 +96,11 @@ private InformerEventSource(
9996 // If there is a primary to secondary mapper there is no need for primary to secondary index.
10097 primaryToSecondaryMapper = configuration .getPrimaryToSecondaryMapper ();
10198 if (useSecondaryToPrimaryIndex ()) {
102- temporalPrimaryToSecondaryIndex =
103- new DefaultTemporalPrimaryToSecondaryIndex <>(configuration .getSecondaryToPrimaryMapper ());
104- addIndexers (
105- Map .of (
106- PRIMARY_TO_SECONDARY_INDEX_NAME ,
107- (R r ) ->
108- configuration .getSecondaryToPrimaryMapper ().toPrimaryResourceIDs (r ).stream ()
109- .map (InformerEventSource ::resourceIdToString )
110- .toList ()));
99+ primaryToSecondaryIndex =
100+ // The index uses the secondary to primary mapper (always present) to build the index
101+ new DefaultPrimaryToSecondaryIndex <>(configuration .getSecondaryToPrimaryMapper ());
111102 } else {
112- temporalPrimaryToSecondaryIndex = NOOPTemporalPrimaryToSecondaryIndex .getInstance ();
103+ primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex .getInstance ();
113104 }
114105
115106 final var informerConfig = configuration .getInformerConfig ();
@@ -128,6 +119,7 @@ public void onAdd(R newResource) {
128119 resourceType ().getSimpleName (),
129120 newResource .getMetadata ().getResourceVersion ());
130121 }
122+ primaryToSecondaryIndex .onAddOrUpdate (newResource );
131123 onAddOrUpdate (
132124 Operation .ADD , newResource , null , () -> InformerEventSource .super .onAdd (newResource ));
133125 }
@@ -142,7 +134,7 @@ public void onUpdate(R oldObject, R newObject) {
142134 newObject .getMetadata ().getResourceVersion (),
143135 oldObject .getMetadata ().getResourceVersion ());
144136 }
145-
137+ primaryToSecondaryIndex . onAddOrUpdate ( newObject );
146138 onAddOrUpdate (
147139 Operation .UPDATE ,
148140 newObject ,
@@ -158,17 +150,23 @@ public void onDelete(R resource, boolean b) {
158150 ResourceID .fromResource (resource ),
159151 resourceType ().getSimpleName ());
160152 }
161- temporalPrimaryToSecondaryIndex . cleanupForResource (resource );
153+ primaryToSecondaryIndex . onDelete (resource );
162154 super .onDelete (resource , b );
163155 if (acceptedByDeleteFilters (resource , b )) {
164156 propagateEvent (resource );
165157 }
166158 }
167159
160+ @ Override
161+ public synchronized void start () {
162+ super .start ();
163+ manager ().list ().forEach (primaryToSecondaryIndex ::onAddOrUpdate );
164+ }
165+
168166 private synchronized void onAddOrUpdate (
169167 Operation operation , R newObject , R oldObject , Runnable superOnOp ) {
170168 var resourceID = ResourceID .fromResource (newObject );
171- temporalPrimaryToSecondaryIndex . cleanupForResource ( newObject );
169+
172170 if (canSkipEvent (newObject , oldObject , resourceID )) {
173171 log .debug (
174172 "Skipping event propagation for {}, since was a result of a reconcile action. Resource"
@@ -252,68 +250,42 @@ private void propagateEvent(R object) {
252250
253251 @ Override
254252 public Set <R > getSecondaryResources (P primary ) {
255-
253+ Set < ResourceID > secondaryIDs ;
256254 if (useSecondaryToPrimaryIndex ()) {
257- var primaryID = ResourceID .fromResource (primary );
258- // Note that the order matter is these lines. This method is not synchronized
259- // because of performance reasons. If it was in reverse order, it could happen
260- // that we did not receive yet an event in the informer so the index would not
261- // be updated. However, before reading it from temp IDs the event arrives and erases
262- // the temp index. So in case of Add not id would be found.
263- var temporalIds = temporalPrimaryToSecondaryIndex .getSecondaryResources (primaryID );
264- var resources = byIndex (PRIMARY_TO_SECONDARY_INDEX_NAME , resourceIdToString (primaryID ));
265-
255+ var primaryResourceID = ResourceID .fromResource (primary );
256+ secondaryIDs = primaryToSecondaryIndex .getSecondaryResources (primaryResourceID );
266257 log .debug (
267- "Using informer primary to secondary index to find secondary resources for primary name:"
268- + " {} namespace: {}. Found number {}" ,
269- primary .getMetadata ().getName (),
270- primary .getMetadata ().getNamespace (),
271- resources .size ());
272-
273- log .debug ("Complementary ids: {}" , temporalIds );
274- var res =
275- resources .stream ()
276- .map (
277- r -> {
278- var resourceId = ResourceID .fromResource (r );
279- Optional <R > resource = temporaryResourceCache .getResourceFromCache (resourceId );
280- temporalIds .remove (resourceId );
281- return resource .orElse (r );
282- })
283- .collect (Collectors .toSet ());
284- temporalIds .forEach (
285- id -> {
286- Optional <R > resource = get (id );
287- resource .ifPresentOrElse (res ::add , () -> log .warn ("Resource not found: {}" , id ));
288- });
289- return res ;
258+ "Using PrimaryToSecondaryIndex to find secondary resources for primary: {}. Found"
259+ + " secondary ids: {} " ,
260+ primaryResourceID ,
261+ secondaryIDs );
290262 } else {
291- Set < ResourceID > secondaryIDs = primaryToSecondaryMapper .toSecondaryResourceIDs (primary );
263+ secondaryIDs = primaryToSecondaryMapper .toSecondaryResourceIDs (primary );
292264 log .debug (
293265 "Using PrimaryToSecondaryMapper to find secondary resources for primary: {}. Found"
294266 + " secondary ids: {} " ,
295267 primary ,
296268 secondaryIDs );
297- return secondaryIDs .stream ()
298- .map (this ::get )
299- .flatMap (Optional ::stream )
300- .collect (Collectors .toSet ());
301269 }
270+ return secondaryIDs .stream ()
271+ .map (this ::get )
272+ .flatMap (Optional ::stream )
273+ .collect (Collectors .toSet ());
302274 }
303275
304276 @ Override
305277 public synchronized void handleRecentResourceUpdate (
306278 ResourceID resourceID , R resource , R previousVersionOfResource ) {
307- handleRecentCreateOrUpdate (resource , previousVersionOfResource );
279+ handleRecentCreateOrUpdate (Operation . UPDATE , resource , previousVersionOfResource );
308280 }
309281
310282 @ Override
311283 public synchronized void handleRecentResourceCreate (ResourceID resourceID , R resource ) {
312- handleRecentCreateOrUpdate (resource , null );
284+ handleRecentCreateOrUpdate (Operation . ADD , resource , null );
313285 }
314286
315- private void handleRecentCreateOrUpdate (R newResource , R oldResource ) {
316- temporalPrimaryToSecondaryIndex . explicitAddOrUpdate (newResource );
287+ private void handleRecentCreateOrUpdate (Operation operation , R newResource , R oldResource ) {
288+ primaryToSecondaryIndex . onAddOrUpdate (newResource );
317289 temporaryResourceCache .putResource (
318290 newResource ,
319291 Optional .ofNullable (oldResource )
@@ -366,8 +338,4 @@ private enum Operation {
366338 ADD ,
367339 UPDATE
368340 }
369-
370- private static String resourceIdToString (ResourceID resourceID ) {
371- return resourceID .getName () + "#" + resourceID .getNamespace ().orElse ("$na" );
372- }
373341}
0 commit comments