Skip to content

HDDS-14006. EventNotification: create a plugin impl which which publishes events to Kafka#10096

Open
gardenia wants to merge 3 commits intoapache:HDDS-13513_Event_Notification_FeatureBranchfrom
gardenia:HDDS-14006
Open

HDDS-14006. EventNotification: create a plugin impl which which publishes events to Kafka#10096
gardenia wants to merge 3 commits intoapache:HDDS-13513_Event_Notification_FeatureBranchfrom
gardenia:HDDS-14006

Conversation

@gardenia
Copy link
Copy Markdown

Please describe your PR in detail:

  • Create an OMEventListener plugin implementation which publishes events to Kafka
  • a helper class OMEventListenerLedgerPoller is provided which plugin implementations can use to periodically poll for newly written CompletedRequestInfo records table and passes them them to a callback
  • OMEventListenerKafkaPublisher is a concrete implementation of OMEventListener which consumes the latest CompletedRequestInfo records, serializes them to a string and sends them to a configured kafka broker.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-14006

How was this patch tested?

unit tests

@gardenia
Copy link
Copy Markdown
Author

@ChenSammi : as per #9366 I was unable to reopen that PR and so the only option was to open a new one.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an OM EventNotification plugin framework plus a concrete Kafka publisher implementation intended to publish OM completed-operation events to Kafka, along with build/distribution updates to ship the new plugin module and Kafka client dependency.

Changes:

  • Add new OM event listener plugin interfaces and an OM-side plugin manager/context to load and run configured listeners.
  • Add a new ozone-manager-plugins module with a ledger poller helper and a Kafka publisher implementation + unit tests.
  • Update Maven dependencies and distribution/license manifests to include kafka-clients, lz4-java, and the new plugin jar.

Reviewed changes

Copilot reviewed 20 out of 22 changed files in this pull request and generated 24 comments.

Show a summary per file
File Description
pom.xml Adds Kafka client dependency/version, adds ozone-manager-plugins dependency, and updates dependency-check ignore list.
hadoop-ozone/pom.xml Registers new ozone-manager-plugins module in the build.
hadoop-ozone/common/.../eventlistener/* Introduces OMEventListener and OMEventListenerPluginContext APIs.
hadoop-ozone/ozone-manager/.../eventlistener/* Adds OM plugin manager and plugin context implementation.
hadoop-ozone/interface-storage/.../OMMetadataManager.java Adds listCompletedRequestInfo to metadata manager API.
hadoop-ozone/ozone-manager/.../OmMetadataManagerImpl.java Implements listCompletedRequestInfo (currently with a critical type-comparison bug).
hadoop-ozone/ozone-manager-plugins/... Adds ledger poller helper classes and OMEventListenerKafkaPublisher.
hadoop-ozone//test/ Adds unit tests for plugin manager and Kafka publisher.
hadoop-ozone/dist/src/main/license/* Updates jar-report and LICENSE to account for new shipped artifacts.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

LOG.debug("Running OMEventListenerLedgerPoller");
}
if (runCount.get() == 0) {
seekPosition.initSeekPosition();
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seekPosition.initSeekPosition() is called on the first run but its return value is ignored, and OMEventListenerLedgerPollerSeekPosition#initSeekPosition() does not update internal state. As written, this first-run init is a no-op; either have initSeekPosition() update the AtomicReference, or call seekPosition.set(...) with the loaded value.

Suggested change
seekPosition.initSeekPosition();
seekPosition.set(seekPosition.initSeekPosition());

Copilot uses AI. Check for mistakes.
Comment on lines +62 to +65
LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," +
"serviceTimeout={}, kafkaProps={}, seekPosition={}",
kafkaServiceInterval, kafkaServiceTimeout, kafkaProps,
seekPosition);
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the full kafkaProps can leak sensitive values (eg SASL passwords, tokens). Avoid logging the entire properties map at INFO; log only non-sensitive keys (or redact known sensitive ones) and consider lowering to DEBUG.

Copilot uses AI. Check for mistakes.
private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf,
String destName) {
String classnameProp = PLUGIN_DEST_BASE + destName + ".classname";
LOG.info("Gettting classname for {} with propety {}", destName, classnameProp);
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in log message: "Gettting" -> "Getting".

Suggested change
LOG.info("Gettting classname for {} with propety {}", destName, classnameProp);
LOG.info("Getting classname for {} with property {}", destName, classnameProp);

Copilot uses AI. Check for mistakes.
Comment on lines +88 to +94
try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockeKafkaClientWrapper =
mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) {

plugin.initialize(conf, pluginContext);
plugin.handleCompletedRequest(op);

OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0);
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in variable name mockeKafkaClientWrapper (missing d). Consider renaming to mockedKafkaClientWrapper for clarity.

Suggested change
try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockeKafkaClientWrapper =
mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) {
plugin.initialize(conf, pluginContext);
plugin.handleCompletedRequest(op);
OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0);
try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockedKafkaClientWrapper =
mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) {
plugin.initialize(conf, pluginContext);
plugin.handleCompletedRequest(op);
OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockedKafkaClientWrapper.constructed().get(0);

Copilot uses AI. Check for mistakes.
private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf,
String destName) {
String classnameProp = PLUGIN_DEST_BASE + destName + ".classname";
LOG.info("Gettting classname for {} with propety {}", destName, classnameProp);
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in log message: "propety" -> "property".

Suggested change
LOG.info("Gettting classname for {} with propety {}", destName, classnameProp);
LOG.info("Gettting classname for {} with property {}", destName, classnameProp);

Copilot uses AI. Check for mistakes.
Comment on lines +91 to +96
OMEventListener impl = cls.newInstance();
impl.initialize(conf, pluginContext);

plugins.add(impl);
} catch (Exception ex) {
LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex);
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class#newInstance() is deprecated and can mask constructor exceptions. Prefer cls.getDeclaredConstructor().newInstance() and handle the reflective exceptions explicitly.

Suggested change
OMEventListener impl = cls.newInstance();
impl.initialize(conf, pluginContext);
plugins.add(impl);
} catch (Exception ex) {
LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex);
OMEventListener impl = cls.getDeclaredConstructor().newInstance();
impl.initialize(conf, pluginContext);
plugins.add(impl);
} catch (ReflectiveOperationException ex) {
LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex);
} catch (Exception ex) {
LOG.error("Can't initialize event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex);

Copilot uses AI. Check for mistakes.
}

public void shutdown() throws IOException {
producer.close();
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaClientWrapper.shutdown() calls producer.close() without a null check. If initialization failed (or start() was never called), this can throw NPE and is not caught by the caller (it only catches IOException). Guard producer and make shutdown idempotent.

Suggested change
producer.close();
KafkaProducer<String, String> currentProducer = producer;
producer = null;
if (currentProducer != null) {
currentProducer.close();
}

Copilot uses AI. Check for mistakes.
import org.mockito.junit.jupiter.MockitoExtension;

/**
* Tests {@link OMEventListenerPluginManager}.
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc references the wrong class: this test suite targets OMEventListenerKafkaPublisher, not OMEventListenerPluginManager. Update the link to avoid confusion.

Suggested change
* Tests {@link OMEventListenerPluginManager}.
* Tests {@link OMEventListenerKafkaPublisher}.

Copilot uses AI. Check for mistakes.
}

@Test
public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException {
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test name mentions producing both create and delete events, but the assertions only validate a single event. Rename the test (or adjust expectations) so it reflects the behavior being verified.

Suggested change
public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException {
public void testRenameRequestProducesRenameKeyEvent() throws InterruptedException, IOException {

Copilot uses AI. Check for mistakes.

super("OMEventListenerLedgerPoller",
interval,
TimeUnit.MILLISECONDS,
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor takes a TimeUnit unit but the BackgroundService super constructor is called with TimeUnit.MILLISECONDS, ignoring the passed unit. Pass unit through (and convert interval appropriately) or remove the unit parameter to avoid incorrect scheduling.

Suggested change
TimeUnit.MILLISECONDS,
unit,

Copilot uses AI. Check for mistakes.
Colm Dougan added 2 commits April 30, 2026 13:58
Also make the seek position string be deserialized from a string to a
long earlier in the process so it doesn't have to be deserialized in
multiple places and the DB list calls can just take a Long as the seek
position
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants