HDDS-14006. EventNotification: create a plugin impl which which publishes events to Kafka#10096
Conversation
|
@ChenSammi : as per #9366 I was unable to reopen that PR and so the only option was to open a new one. |
There was a problem hiding this comment.
Pull request overview
This PR introduces an OM EventNotification plugin framework plus a concrete Kafka publisher implementation intended to publish OM completed-operation events to Kafka, along with build/distribution updates to ship the new plugin module and Kafka client dependency.
Changes:
- Add new OM event listener plugin interfaces and an OM-side plugin manager/context to load and run configured listeners.
- Add a new
ozone-manager-pluginsmodule with a ledger poller helper and a Kafka publisher implementation + unit tests. - Update Maven dependencies and distribution/license manifests to include
kafka-clients,lz4-java, and the new plugin jar.
Reviewed changes
Copilot reviewed 20 out of 22 changed files in this pull request and generated 24 comments.
Show a summary per file
| File | Description |
|---|---|
| pom.xml | Adds Kafka client dependency/version, adds ozone-manager-plugins dependency, and updates dependency-check ignore list. |
| hadoop-ozone/pom.xml | Registers new ozone-manager-plugins module in the build. |
| hadoop-ozone/common/.../eventlistener/* | Introduces OMEventListener and OMEventListenerPluginContext APIs. |
| hadoop-ozone/ozone-manager/.../eventlistener/* | Adds OM plugin manager and plugin context implementation. |
| hadoop-ozone/interface-storage/.../OMMetadataManager.java | Adds listCompletedRequestInfo to metadata manager API. |
| hadoop-ozone/ozone-manager/.../OmMetadataManagerImpl.java | Implements listCompletedRequestInfo (currently with a critical type-comparison bug). |
| hadoop-ozone/ozone-manager-plugins/... | Adds ledger poller helper classes and OMEventListenerKafkaPublisher. |
| hadoop-ozone//test/ | Adds unit tests for plugin manager and Kafka publisher. |
| hadoop-ozone/dist/src/main/license/* | Updates jar-report and LICENSE to account for new shipped artifacts. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| LOG.debug("Running OMEventListenerLedgerPoller"); | ||
| } | ||
| if (runCount.get() == 0) { | ||
| seekPosition.initSeekPosition(); |
There was a problem hiding this comment.
seekPosition.initSeekPosition() is called on the first run but its return value is ignored, and OMEventListenerLedgerPollerSeekPosition#initSeekPosition() does not update internal state. As written, this first-run init is a no-op; either have initSeekPosition() update the AtomicReference, or call seekPosition.set(...) with the loaded value.
| seekPosition.initSeekPosition(); | |
| seekPosition.set(seekPosition.initSeekPosition()); |
| LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + | ||
| "serviceTimeout={}, kafkaProps={}, seekPosition={}", | ||
| kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, | ||
| seekPosition); |
There was a problem hiding this comment.
Logging the full kafkaProps can leak sensitive values (eg SASL passwords, tokens). Avoid logging the entire properties map at INFO; log only non-sensitive keys (or redact known sensitive ones) and consider lowering to DEBUG.
| private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf, | ||
| String destName) { | ||
| String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; | ||
| LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); |
There was a problem hiding this comment.
Typo in log message: "Gettting" -> "Getting".
| LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); | |
| LOG.info("Getting classname for {} with property {}", destName, classnameProp); |
| try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockeKafkaClientWrapper = | ||
| mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { | ||
|
|
||
| plugin.initialize(conf, pluginContext); | ||
| plugin.handleCompletedRequest(op); | ||
|
|
||
| OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0); |
There was a problem hiding this comment.
Typo in variable name mockeKafkaClientWrapper (missing d). Consider renaming to mockedKafkaClientWrapper for clarity.
| try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockeKafkaClientWrapper = | |
| mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { | |
| plugin.initialize(conf, pluginContext); | |
| plugin.handleCompletedRequest(op); | |
| OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0); | |
| try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockedKafkaClientWrapper = | |
| mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { | |
| plugin.initialize(conf, pluginContext); | |
| plugin.handleCompletedRequest(op); | |
| OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockedKafkaClientWrapper.constructed().get(0); |
| private static Class<? extends OMEventListener> resolvePluginClass(OzoneConfiguration conf, | ||
| String destName) { | ||
| String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; | ||
| LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); |
There was a problem hiding this comment.
Typo in log message: "propety" -> "property".
| LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); | |
| LOG.info("Gettting classname for {} with property {}", destName, classnameProp); |
| OMEventListener impl = cls.newInstance(); | ||
| impl.initialize(conf, pluginContext); | ||
|
|
||
| plugins.add(impl); | ||
| } catch (Exception ex) { | ||
| LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); |
There was a problem hiding this comment.
Class#newInstance() is deprecated and can mask constructor exceptions. Prefer cls.getDeclaredConstructor().newInstance() and handle the reflective exceptions explicitly.
| OMEventListener impl = cls.newInstance(); | |
| impl.initialize(conf, pluginContext); | |
| plugins.add(impl); | |
| } catch (Exception ex) { | |
| LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); | |
| OMEventListener impl = cls.getDeclaredConstructor().newInstance(); | |
| impl.initialize(conf, pluginContext); | |
| plugins.add(impl); | |
| } catch (ReflectiveOperationException ex) { | |
| LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); | |
| } catch (Exception ex) { | |
| LOG.error("Can't initialize event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); |
| } | ||
|
|
||
| public void shutdown() throws IOException { | ||
| producer.close(); |
There was a problem hiding this comment.
KafkaClientWrapper.shutdown() calls producer.close() without a null check. If initialization failed (or start() was never called), this can throw NPE and is not caught by the caller (it only catches IOException). Guard producer and make shutdown idempotent.
| producer.close(); | |
| KafkaProducer<String, String> currentProducer = producer; | |
| producer = null; | |
| if (currentProducer != null) { | |
| currentProducer.close(); | |
| } |
| import org.mockito.junit.jupiter.MockitoExtension; | ||
|
|
||
| /** | ||
| * Tests {@link OMEventListenerPluginManager}. |
There was a problem hiding this comment.
Javadoc references the wrong class: this test suite targets OMEventListenerKafkaPublisher, not OMEventListenerPluginManager. Update the link to avoid confusion.
| * Tests {@link OMEventListenerPluginManager}. | |
| * Tests {@link OMEventListenerKafkaPublisher}. |
| } | ||
|
|
||
| @Test | ||
| public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException { |
There was a problem hiding this comment.
Test name mentions producing both create and delete events, but the assertions only validate a single event. Rename the test (or adjust expectations) so it reflects the behavior being verified.
| public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException { | |
| public void testRenameRequestProducesRenameKeyEvent() throws InterruptedException, IOException { |
|
|
||
| super("OMEventListenerLedgerPoller", | ||
| interval, | ||
| TimeUnit.MILLISECONDS, |
There was a problem hiding this comment.
The constructor takes a TimeUnit unit but the BackgroundService super constructor is called with TimeUnit.MILLISECONDS, ignoring the passed unit. Pass unit through (and convert interval appropriately) or remove the unit parameter to avoid incorrect scheduling.
| TimeUnit.MILLISECONDS, | |
| unit, |
…mentation which publishes events to kafka
Also make the seek position string be deserialized from a string to a long earlier in the process so it doesn't have to be deserialized in multiple places and the DB list calls can just take a Long as the seek position
Please describe your PR in detail:
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14006
How was this patch tested?
unit tests