Skip to content

Commit 29e2313

Browse files
committed
fix: improve observer stability, performance, and security
Signed-off-by: Twiineenock <Twiineenock@users.noreply.github.com>
1 parent 1863733 commit 29e2313

10 files changed

Lines changed: 85 additions & 25 deletions

File tree

.github/workflows/codeql.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ jobs:
5454
# If you are analyzing a compiled language, you can modify the 'build-mode' for that language to customize how
5555
# your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages
5656
steps:
57+
- name: Harden Runner
58+
uses: step-security/harden-runner@v2.10.1
59+
with:
60+
egress-policy: audit
61+
5762
- name: Checkout repository
5863
uses: actions/checkout@v6.0.2
5964

.github/workflows/maven.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ jobs:
2020
runs-on: ubuntu-latest
2121

2222
steps:
23+
- name: Harden Runner
24+
uses: step-security/harden-runner@v2.10.1
25+
with:
26+
egress-policy: audit
27+
2328
- name: Checkout Repo
2429
uses: actions/checkout@v6.0.2
2530

.github/workflows/scorecard.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ jobs:
3131
# actions: read
3232

3333
steps:
34+
- name: Harden Runner
35+
uses: step-security/harden-runner@v2.10.1
36+
with:
37+
egress-policy: audit
38+
3439
- name: "Checkout code"
3540
uses: actions/checkout@0c366fd6a839edf440554fa01a7085ccba70ac98 # v5.0.1
3641
with:

hiero-enterprise-base/src/main/java/org/hiero/base/observer/AbstractPollingObserver.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@ public abstract class AbstractPollingObserver<T> {
2929
/**
3030
* Creates a new polling observer.
3131
*
32+
* @param executorService The shared executor service to use for polling.
3233
* @param pollingInterval How often to check for new events.
3334
* @param listener The callback to trigger when an event is found.
3435
*/
35-
protected AbstractPollingObserver(@NonNull Duration pollingInterval, @NonNull EventObserver<T> listener) {
36+
protected AbstractPollingObserver(
37+
@NonNull ScheduledExecutorService executorService,
38+
@NonNull Duration pollingInterval,
39+
@NonNull EventObserver<T> listener) {
40+
this.executorService = executorService;
3641
this.pollingInterval = pollingInterval;
3742
this.listener = listener;
38-
this.executorService = Executors.newSingleThreadScheduledExecutor(r -> {
39-
Thread thread = new Thread(r, "hiero-observer-" + getClass().getSimpleName());
40-
thread.setDaemon(true);
41-
return thread;
42-
});
4343
}
4444

4545
/**
@@ -48,34 +48,43 @@ protected AbstractPollingObserver(@NonNull Duration pollingInterval, @NonNull Ev
4848
public void start() {
4949
if (running.compareAndSet(false, true)) {
5050
log.info("Starting Hiero observer: {}", getClass().getSimpleName());
51-
executorService.scheduleAtFixedRate(this::pollSafe, 0, pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
51+
scheduleNext(0);
52+
}
53+
}
54+
55+
private void scheduleNext(long delayMs) {
56+
if (running.get()) {
57+
executorService.schedule(this::pollSafe, delayMs, TimeUnit.MILLISECONDS);
5258
}
5359
}
5460

5561
/**
5662
* Stops the background polling task.
5763
*/
5864
public void stop() {
59-
if (running.compareAndSet(true, false)) {
60-
log.info("Stopping Hiero observer: {}", getClass().getSimpleName());
61-
executorService.shutdown();
62-
}
65+
running.set(false);
66+
log.info("Stopping Hiero observer: {}", getClass().getSimpleName());
6367
}
6468

6569
private void pollSafe() {
6670
try {
67-
poll();
71+
boolean hasMore = poll();
72+
// If there's more data (pagination), poll again immediately.
73+
// Otherwise, wait for the polling interval.
74+
scheduleNext(hasMore ? 0 : pollingInterval.toMillis());
6875
} catch (Exception e) {
6976
log.error("Unexpected error during Hiero polling in {}", getClass().getSimpleName(), e);
77+
scheduleNext(pollingInterval.toMillis());
7078
}
7179
}
7280

7381
/**
7482
* The actual polling logic to be implemented by subclasses.
7583
*
84+
* @return true if there is more data to poll immediately (pagination), false otherwise.
7685
* @throws Exception if polling fails.
7786
*/
78-
public abstract void poll() throws Exception;
87+
public abstract boolean poll() throws Exception;
7988

8089
/**
8190
* Notifies the listener of a new event.

hiero-enterprise-base/src/main/java/org/hiero/base/observer/TopicObserver.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.time.Instant;
66
import java.util.Comparator;
77
import java.util.List;
8+
import java.util.concurrent.ScheduledExecutorService;
89
import org.hiero.base.data.Page;
910
import org.hiero.base.data.TopicMessage;
1011
import org.hiero.base.mirrornode.TopicRepository;
@@ -26,29 +27,36 @@ public class TopicObserver extends AbstractPollingObserver<TopicMessage> {
2627
/**
2728
* Creates a new topic observer.
2829
*
30+
* @param executorService The shared executor service to use for polling.
2931
* @param repository The repository to use for polling.
3032
* @param topicId The topic to monitor.
3133
* @param pollingInterval How often to poll the mirror node.
3234
* @param listener The callback for new messages.
3335
*/
3436
public TopicObserver(
37+
@NonNull ScheduledExecutorService executorService,
3538
@NonNull TopicRepository repository,
3639
@NonNull TopicId topicId,
3740
@NonNull Duration pollingInterval,
3841
@NonNull EventObserver<TopicMessage> listener) {
39-
super(pollingInterval, listener);
42+
super(executorService, pollingInterval, listener);
4043
this.repository = repository;
4144
this.topicId = topicId;
4245
this.lastSeenTimestamp = Instant.now();
4346
}
4447

4548
@Override
46-
public void poll() throws Exception {
49+
public boolean poll() throws Exception {
4750
log.trace("Polling messages for topic {} after {}", topicId, lastSeenTimestamp);
4851

4952
Page<TopicMessage> page = repository.getMessages(topicId, lastSeenTimestamp);
50-
List<TopicMessage> messages = new java.util.ArrayList<>(page.getData());
53+
processPage(page);
54+
55+
return page.hasNext();
56+
}
5157

58+
private void processPage(Page<TopicMessage> page) {
59+
List<TopicMessage> messages = new java.util.ArrayList<>(page.getData());
5260
if (messages.isEmpty()) {
5361
return;
5462
}

hiero-enterprise-base/src/main/java/org/hiero/base/observer/TransactionObserver.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.time.Instant;
66
import java.util.Comparator;
77
import java.util.List;
8+
import java.util.concurrent.ScheduledExecutorService;
89
import org.hiero.base.data.Page;
910
import org.hiero.base.data.TransactionInfo;
1011
import org.hiero.base.mirrornode.TransactionRepository;
@@ -29,30 +30,37 @@ public class TransactionObserver extends AbstractPollingObserver<TransactionInfo
2930
/**
3031
* Creates a new transaction observer.
3132
*
33+
* @param executorService The shared executor service to use for polling.
3234
* @param repository The repository to use for polling.
3335
* @param accountId The account to monitor.
3436
* @param pollingInterval How often to poll the mirror node.
3537
* @param listener The callback for new transactions.
3638
*/
3739
public TransactionObserver(
40+
@NonNull ScheduledExecutorService executorService,
3841
@NonNull TransactionRepository repository,
3942
@NonNull AccountId accountId,
4043
@NonNull Duration pollingInterval,
4144
@NonNull EventObserver<TransactionInfo> listener) {
42-
super(pollingInterval, listener);
45+
super(executorService, pollingInterval, listener);
4346
this.repository = repository;
4447
this.accountId = accountId;
4548
// Start from current time to avoid processing historical transactions by default
4649
this.lastSeenTimestamp = Instant.now();
4750
}
4851

4952
@Override
50-
public void poll() throws Exception {
53+
public boolean poll() throws Exception {
5154
log.trace("Polling transactions for account {} after {}", accountId, lastSeenTimestamp);
5255

5356
Page<TransactionInfo> page = repository.findByAccount(accountId, lastSeenTimestamp);
54-
List<TransactionInfo> transactions = new java.util.ArrayList<>(page.getData());
57+
processPage(page);
58+
59+
return page.hasNext();
60+
}
5561

62+
private void processPage(Page<TransactionInfo> page) {
63+
List<TransactionInfo> transactions = new java.util.ArrayList<>(page.getData());
5664
if (transactions.isEmpty()) {
5765
return;
5866
}

hiero-enterprise-base/src/test/java/org/hiero/base/test/TransactionObserverTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.time.Instant;
1010
import java.util.Collections;
1111
import java.util.List;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.ScheduledExecutorService;
1214
import org.hiero.base.data.Page;
1315
import org.hiero.base.data.TransactionInfo;
1416
import org.hiero.base.mirrornode.TransactionRepository;
@@ -30,11 +32,12 @@ class TransactionObserverTest {
3032

3133
private TransactionObserver observer;
3234
private final AccountId accountId = AccountId.fromString("0.0.1234");
35+
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
3336

3437
@BeforeEach
3538
void setUp() {
3639
MockitoAnnotations.openMocks(this);
37-
observer = new TransactionObserver(repository, accountId, Duration.ofMillis(100), listener);
40+
observer = new TransactionObserver(executorService, repository, accountId, Duration.ofMillis(100), listener);
3841
}
3942

4043
@Test

hiero-enterprise-spring-sample/src/main/java/org/hiero/spring/sample/HieroEventListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ public class HieroEventListener {
1919
*/
2020
@HieroTransactionListener(account = "0.0.1234", interval = 10000)
2121
public void onTransaction(TransactionInfo tx) {
22-
log.info(">>> [Hiero Event] New transaction detected: {} (Status: {})",
23-
tx.transactionId(), tx.status());
22+
log.info(">>> [Hiero Event] New transaction detected: {} (Result: {})",
23+
tx.transactionId(), tx.result());
2424
}
2525

2626
/**

hiero-enterprise-spring/src/main/java/org/hiero/spring/implementation/HieroAutoConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ MirrorNodeClient mirrorNodeClient(final HieroContext hieroContext) {
129129
final String mirrorNodeEndpointProtocol = url.getProtocol();
130130
final String mirrorNodeEndpointHost = url.getHost();
131131
final int mirrorNodeEndpointPort;
132-
if (mirrorNodeEndpointProtocol == "https" && url.getPort() == -1) {
132+
if (mirrorNodeEndpointProtocol.equals("https") && url.getPort() == -1) {
133133
mirrorNodeEndpointPort = 443;
134-
} else if (mirrorNodeEndpointProtocol == "http" && url.getPort() == -1) {
134+
} else if (mirrorNodeEndpointProtocol.equals("http") && url.getPort() == -1) {
135135
mirrorNodeEndpointPort = 80;
136136
} else if (url.getPort() == -1) {
137137
mirrorNodeEndpointPort = 443;

hiero-enterprise-spring/src/main/java/org/hiero/spring/implementation/HieroListenerProcessor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
import org.hiero.spring.annotation.HieroTransactionListener;
1616
import org.slf4j.Logger;
1717
import org.slf4j.LoggerFactory;
18+
import org.springframework.aop.framework.AopProxyUtils;
1819
import org.springframework.beans.BeansException;
1920
import org.springframework.beans.factory.config.BeanPostProcessor;
2021
import org.springframework.context.SmartLifecycle;
22+
import org.springframework.scheduling.TaskScheduler;
23+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
2124
import org.springframework.util.ReflectionUtils;
2225

2326
/**
@@ -30,17 +33,29 @@ public class HieroListenerProcessor implements BeanPostProcessor, SmartLifecycle
3033

3134
private final TransactionRepository transactionRepository;
3235
private final TopicRepository topicRepository;
36+
private final TaskScheduler taskScheduler;
3337
private final List<AbstractPollingObserver<?>> observers = new ArrayList<>();
3438
private boolean running = false;
3539

3640
public HieroListenerProcessor(TransactionRepository transactionRepository, TopicRepository topicRepository) {
3741
this.transactionRepository = transactionRepository;
3842
this.topicRepository = topicRepository;
43+
this.taskScheduler = createDefaultTaskScheduler();
44+
}
45+
46+
private TaskScheduler createDefaultTaskScheduler() {
47+
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
48+
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
49+
scheduler.setThreadNamePrefix("hiero-observer-");
50+
scheduler.setDaemon(true);
51+
scheduler.initialize();
52+
return scheduler;
3953
}
4054

4155
@Override
4256
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
43-
ReflectionUtils.doWithMethods(bean.getClass(), method -> {
57+
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
58+
ReflectionUtils.doWithMethods(targetClass, method -> {
4459
if (method.isAnnotationPresent(HieroTransactionListener.class)) {
4560
processTransactionListener(bean, method);
4661
}
@@ -56,6 +71,7 @@ private void processTransactionListener(Object bean, Method method) {
5671
log.info("Registering transaction listener for account {} on method {}", annotation.account(), method.getName());
5772

5873
TransactionObserver observer = new TransactionObserver(
74+
((ThreadPoolTaskScheduler) taskScheduler).getScheduledExecutor(),
5975
transactionRepository,
6076
AccountId.fromString(annotation.account()),
6177
Duration.ofMillis(annotation.interval()),
@@ -72,6 +88,7 @@ private void processTopicListener(Object bean, Method method) {
7288
log.info("Registering topic listener for topic {} on method {}", annotation.topicId(), method.getName());
7389

7490
TopicObserver observer = new TopicObserver(
91+
((ThreadPoolTaskScheduler) taskScheduler).getScheduledExecutor(),
7592
topicRepository,
7693
TopicId.fromString(annotation.topicId()),
7794
Duration.ofMillis(annotation.interval()),

0 commit comments

Comments
 (0)