diff --git a/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanCacheObserver.java b/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanCacheObserver.java index 3abbac7524..21067b65a0 100644 --- a/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanCacheObserver.java +++ b/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanCacheObserver.java @@ -35,19 +35,19 @@ public interface BeanCacheObserver { * Called for an add/modify/delete event of a bean. * * @param bd the bean definition changed event - * @param bean the current instance (on ADD/MODIFY) or {@code null} (on DELETE) + * @param newBean the current instance (on ADD/MODIFY) or {@code null} (on DELETE) * @param oldBean the previous instance (on MODIFY) or {@code null} * @throws IOException if handling the event performs I/O and it fails * */ - void handleBeanEvent(BeanDefinitionChanged bd, Object bean, Object oldBean) throws IOException; + void handleBeanEvent(BeanDefinitionChanged bd, Object newBean, Object oldBean) throws IOException; /** * Whether beans of the given definition should be considered activatable/usable * by the runtime. * - * @param bd the bean definition + * @param bd the bean container * @return {@code true} if activatable, {@code false} otherwise */ - boolean isActivatable(BeanDefinition bd); + boolean isActivatable(BeanContainer bd); } diff --git a/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanContainer.java b/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanContainer.java index d3eba278b7..8a7114433d 100644 --- a/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanContainer.java +++ b/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanContainer.java @@ -27,6 +27,10 @@ public class BeanContainer { private static final Logger log = LoggerFactory.getLogger(BeanContainer.class); private final BeanDefinition definition; + /** + * The class of the bean. + */ + private final Class clazz; /** * Constructed bean after initialization. */ @@ -35,16 +39,18 @@ public class BeanContainer { /** * Creates a BeanDefinition where the bean has not yet been instantiated and initialized. */ - public BeanContainer(BeanDefinition definition) { + public BeanContainer(BeanDefinition definition, Grammar grammar) { this.definition = definition; + this.clazz = GenericYamlParser.decideClazz(definition.getKind(), grammar, definition.getNode()); } /** * Creates a BeanDefinition where the bean has already been instantiated and initialized. */ - public BeanContainer(BeanDefinition definition, Object singleton) { + public BeanContainer(BeanDefinition definition, @NotNull Object singleton) { this.definition = definition; this.singleton.set(singleton); + this.clazz = singleton.getClass(); } /** @@ -95,6 +101,7 @@ public String toString() { return define(registry, grammar); } + // Singleton: ensure define() runs at most once per BeanContainer. synchronized (this) { Object existing = getSingleton(); @@ -117,4 +124,10 @@ private static boolean isPrototypeScope(BeanDefinition bd) { ); } + /** + * Checks whether this bean definition will produce a bean which can be assigned to the clazz. + */ + public boolean produces(Class clazz) { + return clazz.isAssignableFrom(this.clazz); + } } diff --git a/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementation.java b/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementation.java index 52f9a7fa2b..42868564a0 100644 --- a/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementation.java +++ b/annot/src/main/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementation.java @@ -20,11 +20,14 @@ import org.slf4j.*; import javax.annotation.concurrent.*; +import java.io.IOException; import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.*; import java.util.function.*; +import static com.predic8.membrane.annot.yaml.WatchAction.ADDED; + /** * TODO: * - More Tests @@ -41,7 +44,7 @@ public class BeanRegistryImplementation implements BeanRegistry, BeanCollector, private static final Logger log = LoggerFactory.getLogger(BeanRegistryImplementation.class); - private final BeanCacheObserver observer; + private final List observers = Collections.synchronizedList(new ArrayList<>()); private final Grammar grammar; // uid -> bean @@ -67,35 +70,46 @@ record UidAction(String uid, WatchAction action) { record PreDestroyCallback(Object bean, Method method) { } - public BeanRegistryImplementation(BeanCacheObserver observer, BeanRegistryAware registryAware, Grammar grammar) { - this.observer = observer; + public BeanRegistryImplementation(Grammar grammar) { this.grammar = grammar; - registryAware.setRegistry(this); } @Override public void start() { + for (BeanContainer bc : bcs.values()) { + bc.getOrCreate(this, grammar); + } } @Override public void handle(ChangeEvent changeEvent, boolean isLast) { if (changeEvent instanceof StaticConfigurationLoaded) { activationRun(); - observer.handleAsynchronousInitializationResult(uidsToActivate.isEmpty()); + handleAsynchronousInitializationResult(uidsToActivate.isEmpty()); } if (changeEvent instanceof BeanDefinitionChanged(WatchAction action, BeanDefinition bd)) { // Keep the latest BeanDefinition for all actions so activationRun // can see both metadata and the action (including DELETED). - bcs.put(bd.getUid(), new BeanContainer(bd)); - - if (!bd.isComponent() && observer.isActivatable(bd)) { - uidsToActivate.add(new UidAction(bd.getUid(), action)); - } + handleBeanContainerChange(action, new BeanContainer(bd, grammar)); if (isLast) activationRun(); } } + private void handleBeanContainerChange(WatchAction action, BeanContainer bc) { + bcs.put(bc.getDefinition().getUid(), bc); + + if (bc.produces(BeanCacheObserver.class)) { + observers.add((BeanCacheObserver) bc.getOrCreate(this, grammar)); + log.debug("Registered BeanRegistry observer: " + bc); + } + + if (!bc.getDefinition().isComponent() && isActivatable(bc)) { + uidsToActivate.add(new UidAction(bc.getDefinition().getUid(), action)); + } + } + + private void activationRun() { Set uidsToRemove = new HashSet<>(); for (UidAction uidAction : uidsToActivate) { @@ -103,8 +117,7 @@ private void activationRun() { try { Object bean = bc.getOrCreate(this, grammar); - // e.g., inform router about a new ApiProxy or GlobalInterceptor - observer.handleBeanEvent(new BeanDefinitionChanged(uidAction.action, bc.getDefinition()), bean, getOldBean(uidAction.action, bc.getDefinition())); + handleBeanEvent(new BeanDefinitionChanged(uidAction.action, bc.getDefinition()), bean, getOldBean(uidAction.action, bc.getDefinition())); if (uidAction.action.isAdded() || uidAction.action.isModified()) singletonBeans.put(bc.getDefinition().getUid(), bean); @@ -155,6 +168,7 @@ public Grammar getGrammar() { @Override public List getBeans(Class clazz) { return bcs.values().stream() + .filter(bd -> bd.produces(clazz)) .map(bc -> bc.getOrCreate(this, grammar)) .filter(Objects::nonNull) .filter(clazz::isInstance) @@ -183,15 +197,14 @@ public void register(String beanName, Object bean) { throw new IllegalArgumentException("bean must not be null"); var uuid = UUID.randomUUID().toString(); - bcs.put(uuid, - new BeanContainer( - new BeanDefinition( - "component", - computeBeanName(beanName, uuid), - null, - uuid, - null), - bean)); + handleBeanContainerChange(ADDED, new BeanContainer( + new BeanDefinition( + "component", + computeBeanName(beanName, uuid), + null, + uuid, + null), + bean)); singletonBeans.put(uuid, bean); // the return value of 'put' is ignored, since bean registration with // random keys should not yield duplicates anyway. @@ -235,4 +248,38 @@ public void close() { }); } + /** + * Checks whether any registered Observer is interested in the given bean. + */ + private boolean isActivatable(BeanContainer bd) { + synchronized (observers) { + for (BeanCacheObserver observer : observers) { + if (observer.isActivatable(bd)) return true; + } + } + return false; + } + + /** + * Notifies all registered Observers about the result of the asynchronous initialization. + */ + private void handleAsynchronousInitializationResult(boolean empty) { + synchronized (observers) { + for (BeanCacheObserver observer : observers) { + observer.handleAsynchronousInitializationResult(empty); + } + } + } + + /** + * Notifies all registered Observers about a bean change. + */ + private void handleBeanEvent(BeanDefinitionChanged event, Object newBean, @Nullable Object oldBean) throws IOException { + synchronized (observers) { + for (BeanCacheObserver observer : observers) + // e.g., inform router about a new ApiProxy or GlobalInterceptor + observer.handleBeanEvent(event, newBean, oldBean); + } + } + } diff --git a/annot/src/main/java/com/predic8/membrane/annot/yaml/GenericYamlParser.java b/annot/src/main/java/com/predic8/membrane/annot/yaml/GenericYamlParser.java index d1c59f87d9..296851ed08 100644 --- a/annot/src/main/java/com/predic8/membrane/annot/yaml/GenericYamlParser.java +++ b/annot/src/main/java/com/predic8/membrane/annot/yaml/GenericYamlParser.java @@ -141,11 +141,18 @@ private static void validate(Grammar grammar, JsonNode input) throws YamlSchemaV * grammar and delegates to {@link #createAndPopulateNode(ParsingContext, Class, JsonNode)}.

*/ public static Object readMembraneObject(String kind, Grammar grammar, JsonNode node, R registry) throws ParsingException { + return createAndPopulateNode(new ParsingContext<>(kind, registry, grammar), decideClazz(kind, grammar, node), node.get(kind)); + } + + /** + * Detects the class that will be selected to represent the node in Java. + */ + public static Class decideClazz(String kind, Grammar grammar, JsonNode node) { ensureSingleKey(node); Class clazz = grammar.getElement(kind); if (clazz == null) throw new ParsingException("Did not find java class for kind '%s'.".formatted(kind), node); - return createAndPopulateNode(new ParsingContext<>(kind, registry, grammar), clazz, node.get(kind)); + return clazz; } /** @@ -306,6 +313,9 @@ private static Object parseMapToObj(ParsingContext ctx, JsonNode node, String * registered within the registry. */ private static T handlePostConstructAndPreDestroy(ParsingContext ctx, T bean) { + if (bean instanceof BeanRegistryAware beanRegistryAware) { + beanRegistryAware.setRegistry(ctx.registry()); + } ReflectionUtils.doWithMethods(bean.getClass(), method -> { if (method.isAnnotationPresent(PostConstruct.class)) { try { diff --git a/annot/src/test/java/com/predic8/membrane/annot/YAMLBeanParsingTest.java b/annot/src/test/java/com/predic8/membrane/annot/YAMLBeanParsingTest.java index 95baa1856b..406547a282 100644 --- a/annot/src/test/java/com/predic8/membrane/annot/YAMLBeanParsingTest.java +++ b/annot/src/test/java/com/predic8/membrane/annot/YAMLBeanParsingTest.java @@ -101,15 +101,15 @@ void prototypeResolveReference() { @Test void missingClassFailsFastOnResolve() { - BeanRegistry r = parse(""" + var ex = assertThrows(RuntimeException.class, () -> { + BeanRegistry r = parse(""" components: x: bean: scope: singleton """); - - var ex = assertThrows(RuntimeException.class, () -> r.resolve("#/components/x")); - assertAnyErrorContains(ex, "Missing/blank 'class'"); + }); + assertAnyErrorContains(ex, "Missing/blank 'class' in bean spec."); } @Test diff --git a/annot/src/test/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementationTest.java b/annot/src/test/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementationTest.java index 3a6b387bb7..36fe7b86bc 100644 --- a/annot/src/test/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementationTest.java +++ b/annot/src/test/java/com/predic8/membrane/annot/beanregistry/BeanRegistryImplementationTest.java @@ -29,7 +29,8 @@ class BeanRegistryImplementationTest { @BeforeEach void setup() { aware = Mockito.mock(BeanRegistryAware.class); - registry = new BeanRegistryImplementation(null, aware, null); + registry = new BeanRegistryImplementation(null); + registry.register("aware", aware); } @Test diff --git a/annot/src/test/java/com/predic8/membrane/annot/util/YamlParser.java b/annot/src/test/java/com/predic8/membrane/annot/util/YamlParser.java index 8c87d4a490..738cbbcc0c 100644 --- a/annot/src/test/java/com/predic8/membrane/annot/util/YamlParser.java +++ b/annot/src/test/java/com/predic8/membrane/annot/util/YamlParser.java @@ -37,14 +37,6 @@ public class YamlParser { */ private final BeanRegistry beanRegistry; - private static class TestRouter implements BeanRegistryAware { - - @Override - public void setRegistry(BeanRegistry registry) { - - } - } - public YamlParser(String resourceName) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, IOException, InterruptedException { Grammar generator = getGrammar(); @@ -55,11 +47,11 @@ public YamlParser(String resourceName) throws ClassNotFoundException, NoSuchMeth String normalized = resourceName.startsWith("/") ? resourceName.substring(1) : resourceName; - BeanRegistryImplementation impl = new BeanRegistryImplementation(getLatchObserver(cdl), new TestRouter(), generator); + BeanRegistryImplementation impl = new BeanRegistryImplementation(generator); impl.parseYamls(requireNonNull(cl.getResourceAsStream(normalized)), generator); - beanRegistry = impl; + impl.start(); - cdl.await(); + beanRegistry = impl; } private @NotNull Grammar getGrammar() @@ -78,28 +70,6 @@ public YamlParser(String resourceName) throws ClassNotFoundException, NoSuchMeth return (Grammar) grammarClass.getConstructor().newInstance(); } - /** - * Used to get notification about termination of parsing - */ - private static @NotNull BeanCacheObserver getLatchObserver(CountDownLatch cdl) { - return new BeanCacheObserver() { - @Override - public void handleAsynchronousInitializationResult(boolean empty) { - cdl.countDown(); - } - - @Override - public void handleBeanEvent(BeanDefinitionChanged bdc, Object bean, Object oldBean) { - - } - - @Override - public boolean isActivatable(BeanDefinition bd) { - return true; - } - }; - } - /** * Called by reflection from the YAML parser in CompilerHelper * @return BeanRegistry diff --git a/core/src/main/java/com/predic8/membrane/core/cli/RouterCLI.java b/core/src/main/java/com/predic8/membrane/core/cli/RouterCLI.java index de4bc97fd5..b8fa3e0a39 100644 --- a/core/src/main/java/com/predic8/membrane/core/cli/RouterCLI.java +++ b/core/src/main/java/com/predic8/membrane/core/cli/RouterCLI.java @@ -163,13 +163,15 @@ private static Router initRouterByYAML(String location) throws Exception { router.getConfiguration().setBaseLocation(location); GrammarAutoGenerated grammar = new GrammarAutoGenerated(); - BeanRegistryImplementation reg = new BeanRegistryImplementation(router, router, grammar); + BeanRegistryImplementation reg = new BeanRegistryImplementation(grammar); + reg.register("router", router); getConfigDefinition(reg.parseYamlBeanDefinitions(router.getResolverMap().resolve(location), grammar)) .ifPresent(beanDefinition -> router.applyConfiguration((Configuration) reg.resolve(beanDefinition.getName()))); reg.finishStaticConfiguration(); + reg.start(); router.start(); logStartupMessage(); return router; diff --git a/core/src/main/java/com/predic8/membrane/core/interceptor/balancer/BalancerHealthMonitor.java b/core/src/main/java/com/predic8/membrane/core/interceptor/balancer/BalancerHealthMonitor.java index cc78c6b0c0..d1945f738b 100644 --- a/core/src/main/java/com/predic8/membrane/core/interceptor/balancer/BalancerHealthMonitor.java +++ b/core/src/main/java/com/predic8/membrane/core/interceptor/balancer/BalancerHealthMonitor.java @@ -15,12 +15,16 @@ package com.predic8.membrane.core.interceptor.balancer; import com.predic8.membrane.annot.*; +import com.predic8.membrane.annot.beanregistry.BeanRegistry; +import com.predic8.membrane.annot.beanregistry.BeanRegistryAware; import com.predic8.membrane.core.exchange.*; import com.predic8.membrane.core.interceptor.balancer.Node.*; import com.predic8.membrane.core.router.*; import com.predic8.membrane.core.transport.http.*; import com.predic8.membrane.core.transport.http.client.*; import com.predic8.membrane.core.util.*; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import org.jetbrains.annotations.*; import org.slf4j.*; import org.springframework.beans.*; @@ -46,7 +50,7 @@ * @topic 4. Monitoring, Logging and Statistics */ @MCElement(name = "balancerHealthMonitor") -public class BalancerHealthMonitor implements ApplicationContextAware, InitializingBean, DisposableBean { +public class BalancerHealthMonitor implements ApplicationContextAware, BeanRegistryAware, InitializingBean, DisposableBean { private static final Logger log = LoggerFactory.getLogger(BalancerHealthMonitor.class); public static final String BALANCER_HEALTH_MONITOR = "balancer-health-monitor"; @@ -161,11 +165,18 @@ public void setApplicationContext(ApplicationContext applicationContext) throws this.router = applicationContext.getBean(Router.class); } + @Override + public void setRegistry(BeanRegistry registry) { + this.router = registry.getBean(Router.class).orElseThrow(); + } + + @PostConstruct @Override public void afterPropertiesSet() { init(); } + @PreDestroy @Override public void destroy() { stopped = true; diff --git a/core/src/main/java/com/predic8/membrane/core/kubernetes/KubernetesWatcher.java b/core/src/main/java/com/predic8/membrane/core/kubernetes/KubernetesWatcher.java index ae1b6477cc..161b39c709 100644 --- a/core/src/main/java/com/predic8/membrane/core/kubernetes/KubernetesWatcher.java +++ b/core/src/main/java/com/predic8/membrane/core/kubernetes/KubernetesWatcher.java @@ -47,7 +47,8 @@ public class KubernetesWatcher { public KubernetesWatcher(DefaultRouter router) { this.router = router; this.grammar = new GrammarAutoGenerated(); - var br = new BeanRegistryImplementation(router, router, grammar); + var br = new BeanRegistryImplementation(grammar); + br.register("router", router); this.beanRegistry = new AsyncBeanCollector(br); } diff --git a/core/src/main/java/com/predic8/membrane/core/router/DefaultMainComponents.java b/core/src/main/java/com/predic8/membrane/core/router/DefaultMainComponents.java index 75c15473bb..e548a9f18e 100644 --- a/core/src/main/java/com/predic8/membrane/core/router/DefaultMainComponents.java +++ b/core/src/main/java/com/predic8/membrane/core/router/DefaultMainComponents.java @@ -69,7 +69,8 @@ public void init() { log.debug("Initializing."); if (registry == null) { - registry = new BeanRegistryImplementation(null, router, null); + registry = new BeanRegistryImplementation(null); + registry.register("router", router); } registry.registerIfAbsent(HttpClientConfiguration.class, HttpClientConfiguration::new); @@ -175,8 +176,10 @@ public void setRegistry(BeanRegistry registry) { } public BeanRegistry getRegistry() { - if (registry == null) - registry = new BeanRegistryImplementation(null, router, null); + if (registry == null) { + registry = new BeanRegistryImplementation(null); + registry.register("router", router); + } return registry; } diff --git a/core/src/main/java/com/predic8/membrane/core/router/DefaultRouter.java b/core/src/main/java/com/predic8/membrane/core/router/DefaultRouter.java index c5ed72bc68..e843b9e48e 100644 --- a/core/src/main/java/com/predic8/membrane/core/router/DefaultRouter.java +++ b/core/src/main/java/com/predic8/membrane/core/router/DefaultRouter.java @@ -388,8 +388,8 @@ else if (bdc.action().isModified()) { } @Override - public boolean isActivatable(BeanDefinition bd) { - return Proxy.class.isAssignableFrom(new GrammarAutoGenerated().getElement(bd.getKind())); + public boolean isActivatable(BeanContainer bc) { + return bc.produces(Proxy.class); } public AbstractRefreshableApplicationContext getRef() { diff --git a/core/src/test/java/com/predic8/membrane/core/router/TestRouter.java b/core/src/test/java/com/predic8/membrane/core/router/TestRouter.java index 87f25fb9c1..e5a396e79c 100644 --- a/core/src/test/java/com/predic8/membrane/core/router/TestRouter.java +++ b/core/src/test/java/com/predic8/membrane/core/router/TestRouter.java @@ -41,7 +41,7 @@ */ public class TestRouter extends AbstractRouter implements BeanRegistryAware { - protected BeanRegistry registry = new BeanRegistryImplementation(null, this, null); + protected BeanRegistry registry = new BeanRegistryImplementation(null); protected Transport transport = new HttpTransport(); @@ -66,6 +66,7 @@ public class TestRouter extends AbstractRouter implements BeanRegistryAware { @Override public void init() { + registry.register("router", this); transport.init(this); ruleManager.setRouter(this); }