Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions annot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<artifactId>json-schema-validator</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>4.9.8</version>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package com.predic8.membrane.annot.bean;

import com.fasterxml.jackson.databind.*;
import com.predic8.membrane.annot.beanregistry.BeanRegistry;
import com.predic8.membrane.annot.util.*;
import com.predic8.membrane.annot.yaml.*;
import org.jetbrains.annotations.*;

import java.lang.reflect.*;
Expand Down Expand Up @@ -239,7 +239,7 @@ private Field findField(Class<?> clazz, String name) {

private Object resolveValueOrRef(Class<?> targetType, String value, String ref) {
if (ref != null && !ref.isBlank()) {
Object o = registry.resolveReference(ref);
Object o = registry.resolve(ref);
if (o != null && !targetType.isInstance(o)) {
if (!(targetType.isPrimitive() && isWrapperOfPrimitive(targetType, o.getClass()))) {
throw new IllegalArgumentException("Ref '%s' is not assignable to %s".formatted(ref, targetType.getName()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.predic8.membrane.annot.beanregistry;

import javax.annotation.concurrent.GuardedBy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

/**
* Thread-safe, asynchronous wrapper for {@link BeanCollector}.
*/
public class AsyncBeanCollector implements BeanCollector {

private final BlockingQueue<ChangeEvent> changeEvents = new LinkedBlockingDeque<>();
private final BeanCollector delegate;

@GuardedBy("this")
Thread t;

public AsyncBeanCollector(BeanCollector delegate) {
this.delegate = delegate;
}

@Override
public void handle(ChangeEvent changeEvent, boolean isLast) {
changeEvents.add(changeEvent);
}

@Override
public synchronized void start() {
if (t != null)
return;
t = new Thread(() -> {
while (true) {
try {
ChangeEvent changeEvent = changeEvents.take();
delegate.handle(changeEvent, changeEvents.isEmpty());
} catch (InterruptedException e) {
break;
}
}
});
t.start();
}
Comment on lines +31 to +42
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Thread lifecycle and configuration issues.

Several concerns with the background thread:

  1. No shutdown mechanism: There's no stop() method to gracefully terminate the thread. Consider adding one that interrupts the thread and awaits termination.

  2. Non-daemon thread: The thread will prevent JVM shutdown. Consider setting t.setDaemon(true) or providing explicit shutdown.

  3. Thread naming: Add a descriptive name for debugging: t.setName("AsyncBeanCollector-worker").

  4. Exception handling: If delegate.handle() throws an unchecked exception, the thread will terminate silently.

🔎 Proposed improvements
     @Override
     public synchronized void start() {
         if (t != null)
             return;
         t = new Thread(() -> {
             while (true) {
                 try {
                     ChangeEvent changeEvent = changeEvents.take();
                     delegate.handle(changeEvent, changeEvents.isEmpty());
                 } catch (InterruptedException e) {
                     break;
+                } catch (Exception e) {
+                    // Log and continue processing
+                    e.printStackTrace();
                 }
             }
         });
+        t.setName("AsyncBeanCollector-worker");
+        t.setDaemon(true);
         t.start();
     }
+
+    public synchronized void stop() {
+        if (t != null) {
+            t.interrupt();
+            try {
+                t.join(5000);
+            } catch (InterruptedException ignored) {
+                Thread.currentThread().interrupt();
+            }
+            t = null;
+        }
+    }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
annot/src/main/java/com/predic8/membrane/annot/beanregistry/AsyncBeanCollector.java
around lines 31-42, the background thread lacks lifecycle controls and
resilience: add a public stop() method that interrupts the worker thread and
waits (join with a timeout) for it to terminate, set the thread to daemon
(t.setDaemon(true)) and give it a descriptive name
(t.setName("AsyncBeanCollector-worker")), and wrap delegate.handle(...) in a
try/catch(Throwable) that logs unexpected runtime errors and continues the loop
so the thread does not die silently; ensure InterruptedException handling breaks
the loop and preserves interrupt status as appropriate.


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.predic8.membrane.annot.beanregistry;

import com.predic8.membrane.annot.Grammar;
import com.predic8.membrane.annot.yaml.GenericYamlParser;
import com.predic8.membrane.annot.yaml.WatchAction;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

/**
* This is the definition side of a {@link BeanRegistryImplementation}. You can start the bean registry
* and send it a series of change events.
*/
public interface BeanCollector {
/**
* Utility method to ingest a stream of YAML objects as a static configuration and then
* start the bean registry.
* @param yamls stream of YAML objects
* @param grammar the grammar to use for parsing
*/
default void parseYamls(InputStream yamls, Grammar grammar) throws IOException {
List<BeanDefinition> bds = GenericYamlParser.parseMembraneResources(yamls, grammar);
for (int i = 0; i < bds.size(); i++) {
handle(new BeanDefinitionChanged(WatchAction.ADDED, bds.get(i)), i == bds.size() - 1);
}
handle(new StaticConfigurationLoaded(), true);
start();
}

/**
* @param changeEvent the change event
* @param isLast indicates whether this is the last change event for this batch of changes
*/
void handle(ChangeEvent changeEvent, boolean isLast);

/**
* Starts the bean registry.
*/
void start();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.predic8.membrane.annot.beanregistry;

public class BeanContainer {
private final BeanDefinition definition;
/**
* Constructed bean after initialization.
*/
private volatile Object singleton;

public BeanContainer(BeanDefinition definition) {
this.definition = definition;
}


public Object getSingleton() {
return singleton;
}

public void setSingleton(Object singleton) {
this.singleton = singleton;
}

public BeanDefinition getDefinition() {
return definition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
package com.predic8.membrane.annot.yaml;
package com.predic8.membrane.annot.beanregistry;

import com.fasterxml.jackson.databind.JsonNode;
import com.predic8.membrane.annot.*;
import com.predic8.membrane.annot.bean.*;

import java.util.*;
import com.predic8.membrane.annot.yaml.WatchAction;

import static com.predic8.membrane.annot.yaml.WatchAction.*;

Expand All @@ -29,19 +26,12 @@ public class BeanDefinition {
private final String namespace;
private final String uid;
private final JsonNode node;
private final WatchAction action;
private final String kind;

/**
* Constructed bean after initialization.
*/
private Object bean;

/**
* Only called from K8S.
*/
private BeanDefinition(WatchAction action, JsonNode node) {
this.action = action;
private BeanDefinition(JsonNode node) {
this.node = node;
JsonNode metadata = node.get("metadata");
var kind2 = node.get("kind").asText();
Expand All @@ -55,8 +45,8 @@ private BeanDefinition(WatchAction action, JsonNode node) {
uid = metadata.get("uid").asText();
}

public static BeanDefinition create4Kubernetes(WatchAction action, JsonNode node) {
return new BeanDefinition(action, node);
public static BeanDefinitionChanged create4Kubernetes(WatchAction action, JsonNode node) {
return new BeanDefinitionChanged(action, new BeanDefinition(node));
}

public BeanDefinition(String kind, String name, String namespace, String uid, JsonNode node) {
Expand All @@ -65,17 +55,12 @@ public BeanDefinition(String kind, String name, String namespace, String uid, Js
this.namespace = namespace;
this.uid = uid;
this.node = node;
this.action = ADDED;
}

public JsonNode getNode() {
return node;
}

public WatchAction getAction() {
return action;
}

public String getNamespace() {
return namespace;
}
Expand All @@ -92,15 +77,6 @@ public String getKind() {
return kind;
}

public Object getBean() {
return bean;
}

// TODO: Rest is immutable - can we make this also?
public void setBean(Object bean) {
this.bean = bean;
}

public String getScope() {
JsonNode meta = node.get("metadata");
if (meta == null)
Expand All @@ -124,15 +100,4 @@ public boolean isPrototype() {
return PROTOTYPE.equals(getScope());
}

public boolean isDeleted() {
return action == DELETED;
}

public boolean isModified() {
return action == MODIFIED;
}

public boolean isAdded() {
return action == ADDED;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.predic8.membrane.annot.beanregistry;

import com.predic8.membrane.annot.yaml.WatchAction;

/**
* Signals that a BeanDefinition has changed (=was added, modified, or deleted).
*/
public record BeanDefinitionChanged(
WatchAction action,
BeanDefinition bd) implements ChangeEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
package com.predic8.membrane.annot.yaml;
package com.predic8.membrane.annot.beanregistry;

import com.predic8.membrane.annot.*;

import java.util.*;

public interface BeanRegistry {

Object resolveReference(String url);
Object resolve(String url);

List<Object> getBeans();

void registerBeanDefinitions(List<BeanDefinition> beanDefinitions);

void start();
<T> List<T> getBeans(Class<T> clazz);

Grammar getGrammar();

Expand Down
Loading
Loading