Skip to content

Commit a40af9b

Browse files
committed
feat: implement annotation-driven event observer framework
1 parent cc5ab2c commit a40af9b

17 files changed

Lines changed: 748 additions & 2 deletions

File tree

hiero-enterprise-base/src/main/java/org/hiero/base/implementation/TopicRepositoryImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ public TopicRepositoryImpl(@NonNull final MirrorNodeClient mirrorNodeClient) {
3131
return mirrorNodeClient.queryTopicMessages(topicId);
3232
}
3333

34+
@Override
35+
public @NonNull Page<TopicMessage> getMessages(@NonNull TopicId topicId, @NonNull java.time.Instant after)
36+
throws HieroException {
37+
Objects.requireNonNull(topicId, "topicId must not be null");
38+
Objects.requireNonNull(after, "after must not be null");
39+
return mirrorNodeClient.queryTopicMessages(topicId, after);
40+
}
41+
3442
@Override
3543
public @NonNull Optional<TopicMessage> getMessageBySequenceNumber(
3644
TopicId topicId, long sequenceNumber) throws HieroException {

hiero-enterprise-base/src/main/java/org/hiero/base/implementation/TransactionRepositoryImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ public Page<TransactionInfo> findByAccount(@NonNull final AccountId accountId)
2929
return this.mirrorNodeClient.queryTransactionsByAccount(accountId);
3030
}
3131

32+
@Override
33+
public @NonNull Page<TransactionInfo> findByAccount(
34+
@NonNull AccountId accountId, @NonNull java.time.Instant after) throws HieroException {
35+
Objects.requireNonNull(accountId, "accountId must not be null");
36+
Objects.requireNonNull(after, "after must not be null");
37+
return this.mirrorNodeClient.queryTransactionsByAccount(accountId, after);
38+
}
39+
3240
@Override
3341
public @NonNull Page<TransactionInfo> findByAccountAndType(
3442
@NonNull AccountId accountId, @NonNull TransactionType type) throws HieroException {

hiero-enterprise-base/src/main/java/org/hiero/base/mirrornode/MirrorNodeClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,18 @@ default Optional<Nft> queryNftsByAccountAndTokenIdAndSerial(
175175
@NonNull Page<TransactionInfo> queryTransactionsByAccount(@NonNull AccountId accountId)
176176
throws HieroException;
177177

178+
/**
179+
* Queries transactions for a specific account after a certain timestamp.
180+
*
181+
* @param accountId the account ID to query transactions for
182+
* @param after the timestamp to start querying from (exclusive)
183+
* @return a page of transaction information
184+
* @throws HieroException if an error occurs during the query
185+
*/
186+
@NonNull
187+
Page<TransactionInfo> queryTransactionsByAccount(@NonNull AccountId accountId, @NonNull java.time.Instant after)
188+
throws HieroException;
189+
178190
/**
179191
* Queries all transactions for a specific account and transaction type.
180192
*
@@ -396,6 +408,18 @@ default Optional<Topic> queryTopicById(String topicId) throws HieroException {
396408
*/
397409
@NonNull Page<TopicMessage> queryTopicMessages(TopicId topicId) throws HieroException;
398410

411+
/**
412+
* Return TopicMessages for given topicId after a certain timestamp.
413+
*
414+
* @param topicId id of the topic
415+
* @param after the timestamp to start querying from (exclusive)
416+
* @return Page of TopicMessage
417+
* @throws HieroException if the search fails
418+
*/
419+
@NonNull
420+
Page<TopicMessage> queryTopicMessages(@NonNull TopicId topicId, @NonNull java.time.Instant after)
421+
throws HieroException;
422+
399423
/**
400424
* Return TopicMessages for given topicId.
401425
*

hiero-enterprise-base/src/main/java/org/hiero/base/mirrornode/TopicRepository.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.hiero.base.mirrornode;
22

33
import com.hedera.hashgraph.sdk.TopicId;
4+
import java.time.Instant;
45
import java.util.Objects;
56
import java.util.Optional;
67
import org.hiero.base.HieroException;
@@ -57,7 +58,34 @@ default Page<TopicMessage> getMessages(String topicId) throws HieroException {
5758
Objects.requireNonNull(topicId, "topicId must not be null");
5859
return getMessages(TopicId.fromString(topicId));
5960
}
60-
;
61+
62+
/**
63+
* Return TopicMessages for given topicId after a certain timestamp.
64+
*
65+
* @param topicId id of the topic
66+
* @param after the timestamp to start querying from (exclusive)
67+
* @return Page of TopicMessage
68+
* @throws HieroException if the search fails
69+
*/
70+
@NonNull
71+
Page<TopicMessage> getMessages(@NonNull TopicId topicId, @NonNull Instant after)
72+
throws HieroException;
73+
74+
/**
75+
* Return TopicMessages for given topicId after a certain timestamp.
76+
*
77+
* @param topicId id of the topic
78+
* @param after the timestamp to start querying from (exclusive)
79+
* @return Page of TopicMessage
80+
* @throws HieroException if the search fails
81+
*/
82+
@NonNull
83+
default Page<TopicMessage> getMessages(@NonNull String topicId, @NonNull Instant after)
84+
throws HieroException {
85+
Objects.requireNonNull(topicId, "topicId must not be null");
86+
Objects.requireNonNull(after, "after must not be null");
87+
return getMessages(TopicId.fromString(topicId), after);
88+
}
6189

6290
/**
6391
* Return TopicMessage for given topicId.

hiero-enterprise-base/src/main/java/org/hiero/base/mirrornode/TransactionRepository.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.hiero.base.mirrornode;
22

33
import com.hedera.hashgraph.sdk.AccountId;
4+
import java.time.Instant;
45
import java.util.Objects;
56
import java.util.Optional;
67
import org.hiero.base.HieroException;
@@ -38,6 +39,34 @@ default Page<TransactionInfo> findByAccount(@NonNull String accountId) throws Hi
3839
return findByAccount(AccountId.fromString(accountId));
3940
}
4041

42+
/**
43+
* Find all transactions associated with a specific account after a certain timestamp.
44+
*
45+
* @param accountId id of the account
46+
* @param after the timestamp to start querying from (exclusive)
47+
* @return page of transactions
48+
* @throws HieroException if the search fails
49+
*/
50+
@NonNull
51+
Page<TransactionInfo> findByAccount(@NonNull AccountId accountId, @NonNull Instant after)
52+
throws HieroException;
53+
54+
/**
55+
* Find all transactions associated with a specific account after a certain timestamp.
56+
*
57+
* @param accountId id of the account as a string
58+
* @param after the timestamp to start querying from (exclusive)
59+
* @return page of transactions
60+
* @throws HieroException if the search fails
61+
*/
62+
@NonNull
63+
default Page<TransactionInfo> findByAccount(@NonNull String accountId, @NonNull Instant after)
64+
throws HieroException {
65+
Objects.requireNonNull(accountId, "accountId must not be null");
66+
Objects.requireNonNull(after, "after must not be null");
67+
return findByAccount(AccountId.fromString(accountId), after);
68+
}
69+
4170
/**
4271
* Find all transactions associated with a specific account and has specific transaction type.
4372
*
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package org.hiero.base.observer;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
import org.jspecify.annotations.NonNull;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
/**
13+
* Base implementation for observers that poll the Mirror Node at regular intervals.
14+
*
15+
* <p>This class handles the lifecycle of the background polling task, including
16+
* starting, stopping, and handling unexpected errors during polling.
17+
*
18+
* @param <T> The type of event being observed.
19+
*/
20+
public abstract class AbstractPollingObserver<T> {
21+
22+
private static final Logger log = LoggerFactory.getLogger(AbstractPollingObserver.class);
23+
24+
private final ScheduledExecutorService executorService;
25+
private final Duration pollingInterval;
26+
private final EventObserver<T> listener;
27+
private final AtomicBoolean running = new AtomicBoolean(false);
28+
29+
/**
30+
* Creates a new polling observer.
31+
*
32+
* @param pollingInterval How often to check for new events.
33+
* @param listener The callback to trigger when an event is found.
34+
*/
35+
protected AbstractPollingObserver(@NonNull Duration pollingInterval, @NonNull EventObserver<T> listener) {
36+
this.pollingInterval = pollingInterval;
37+
this.listener = listener;
38+
this.executorService = Executors.newSingleThreadScheduledExecutor(r -> {
39+
Thread thread = new Thread(r, "hiero-observer-" + getClass().getSimpleName());
40+
thread.setDaemon(true);
41+
return thread;
42+
});
43+
}
44+
45+
/**
46+
* Starts the background polling task.
47+
*/
48+
public void start() {
49+
if (running.compareAndSet(false, true)) {
50+
log.info("Starting Hiero observer: {}", getClass().getSimpleName());
51+
executorService.scheduleAtFixedRate(this::pollSafe, 0, pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
52+
}
53+
}
54+
55+
/**
56+
* Stops the background polling task.
57+
*/
58+
public void stop() {
59+
if (running.compareAndSet(true, false)) {
60+
log.info("Stopping Hiero observer: {}", getClass().getSimpleName());
61+
executorService.shutdown();
62+
}
63+
}
64+
65+
private void pollSafe() {
66+
try {
67+
poll();
68+
} catch (Exception e) {
69+
log.error("Unexpected error during Hiero polling in {}", getClass().getSimpleName(), e);
70+
}
71+
}
72+
73+
/**
74+
* The actual polling logic to be implemented by subclasses.
75+
*
76+
* @throws Exception if polling fails.
77+
*/
78+
protected abstract void poll() throws Exception;
79+
80+
/**
81+
* Notifies the listener of a new event.
82+
*
83+
* @param event The event to notify.
84+
*/
85+
protected void notifyListener(@NonNull T event) {
86+
try {
87+
listener.onEvent(event);
88+
} catch (Exception e) {
89+
log.error("Error in event listener callback", e);
90+
}
91+
}
92+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.hiero.base.observer;
2+
3+
/**
4+
* Functional interface for a component that observes events on the Hiero network.
5+
*
6+
* <p>Implementations of this interface typically run in a background thread or are
7+
* triggered by a scheduler to check for new network activities.
8+
*
9+
* @param <T> The type of event being observed (e.g., TransactionInfo, TopicMessage).
10+
*/
11+
@FunctionalInterface
12+
public interface EventObserver<T> {
13+
14+
/**
15+
* Called when a new event is detected.
16+
*
17+
* @param event The detected event.
18+
*/
19+
void onEvent(T event);
20+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.hiero.base.observer;
2+
3+
import com.hedera.hashgraph.sdk.TopicId;
4+
import java.time.Duration;
5+
import java.time.Instant;
6+
import java.util.Comparator;
7+
import java.util.List;
8+
import org.hiero.base.data.Page;
9+
import org.hiero.base.data.TopicMessage;
10+
import org.hiero.base.mirrornode.TopicRepository;
11+
import org.jspecify.annotations.NonNull;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
/**
16+
* Observer that polls for new messages on a specific Hiero Consensus Service (HCS) topic.
17+
*/
18+
public class TopicObserver extends AbstractPollingObserver<TopicMessage> {
19+
20+
private static final Logger log = LoggerFactory.getLogger(TopicObserver.class);
21+
22+
private final TopicRepository repository;
23+
private final TopicId topicId;
24+
private Instant lastSeenTimestamp;
25+
26+
/**
27+
* Creates a new topic observer.
28+
*
29+
* @param repository The repository to use for polling.
30+
* @param topicId The topic to monitor.
31+
* @param pollingInterval How often to poll the mirror node.
32+
* @param listener The callback for new messages.
33+
*/
34+
public TopicObserver(
35+
@NonNull TopicRepository repository,
36+
@NonNull TopicId topicId,
37+
@NonNull Duration pollingInterval,
38+
@NonNull EventObserver<TopicMessage> listener) {
39+
super(pollingInterval, listener);
40+
this.repository = repository;
41+
this.topicId = topicId;
42+
this.lastSeenTimestamp = Instant.now();
43+
}
44+
45+
@Override
46+
protected void poll() throws Exception {
47+
log.trace("Polling messages for topic {} after {}", topicId, lastSeenTimestamp);
48+
49+
Page<TopicMessage> page = repository.getMessages(topicId, lastSeenTimestamp);
50+
List<TopicMessage> messages = page.getData();
51+
52+
if (messages.isEmpty()) {
53+
return;
54+
}
55+
56+
// Sort by timestamp to ensure chronological delivery
57+
messages.sort(Comparator.comparing(TopicMessage::consensusTimestamp));
58+
59+
for (TopicMessage msg : messages) {
60+
notifyListener(msg);
61+
if (msg.consensusTimestamp().isAfter(lastSeenTimestamp)) {
62+
lastSeenTimestamp = msg.consensusTimestamp();
63+
}
64+
}
65+
}
66+
67+
/**
68+
* Sets the starting timestamp for the observer.
69+
*
70+
* @param timestamp The timestamp to start from.
71+
* @return this observer for chaining.
72+
*/
73+
public TopicObserver startFrom(@NonNull Instant timestamp) {
74+
this.lastSeenTimestamp = timestamp;
75+
return this;
76+
}
77+
}

0 commit comments

Comments
 (0)