Skip to content

Commit 4bce19b

Browse files
author
Colm Dougan
committed
HDDS-14006. EventNotification: Created a OMEventListener plugin implementation which publishes events to kafka
1 parent f5db45a commit 4bce19b

15 files changed

Lines changed: 746 additions & 3 deletions

File tree

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,22 @@
1717

1818
package org.apache.hadoop.ozone.om.eventlistener;
1919

20+
import java.io.IOException;
21+
import java.util.List;
22+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
23+
2024
/**
2125
* A narrow set of functionality we are ok with exposing to plugin
2226
* implementations.
2327
*/
2428
public interface OMEventListenerPluginContext {
2529

30+
boolean isLeaderReady();
31+
32+
// TODO: should we allow plugins to pass in maxResults or just limit
33+
// them to some predefined value for safety? e.g. 10K
34+
List<OmCompletedRequestInfo> listCompletedRequestInfo(String startKey, int maxResults) throws IOException;
35+
36+
// XXX: this probably doesn't belong here
37+
String getThreadNamePrefix();
2638
}

hadoop-ozone/dist/src/main/license/bin/LICENSE.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ CDDL 1.1 + GPLv2 with classpath exception
262262
Apache License 2.0
263263
=====================
264264

265+
at.yawk.lz4:lz4-java
265266
ch.qos.reload4j:reload4j
266267
com.amazonaws:aws-java-sdk-core
267268
com.amazonaws:aws-java-sdk-kms
@@ -384,6 +385,7 @@ Apache License 2.0
384385
org.apache.hadoop:hadoop-shaded-guava
385386
org.apache.hadoop:hadoop-shaded-protobuf_3_25
386387
org.apache.httpcomponents:httpcore
388+
org.apache.kafka:kafka-clients
387389
org.apache.kerby:kerb-admin
388390
org.apache.kerby:kerb-client
389391
org.apache.kerby:kerb-common

hadoop-ozone/dist/src/main/license/jar-report.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ share/ozone/lib/json-simple.jar
159159
share/ozone/lib/jsp-api.jar
160160
share/ozone/lib/jspecify.jar
161161
share/ozone/lib/jsr311-api.jar
162+
share/ozone/lib/kafka-clients.jar
162163
share/ozone/lib/kerb-core.jar
163164
share/ozone/lib/kerb-crypto.jar
164165
share/ozone/lib/kerb-util.jar
@@ -170,6 +171,7 @@ share/ozone/lib/kotlin-stdlib.jar
170171
share/ozone/lib/listenablefuture-empty-to-avoid-conflict-with-guava.jar
171172
share/ozone/lib/log4j-api.jar
172173
share/ozone/lib/log4j-core.jar
174+
share/ozone/lib/lz4-java.jar
173175
share/ozone/lib/metrics-core.jar
174176
share/ozone/lib/netty-buffer.Final.jar
175177
share/ozone/lib/netty-codec.Final.jar
@@ -227,6 +229,7 @@ share/ozone/lib/ozone-insight.jar
227229
share/ozone/lib/ozone-interface-client.jar
228230
share/ozone/lib/ozone-interface-storage.jar
229231
share/ozone/lib/ozone-manager.jar
232+
share/ozone/lib/ozone-manager-plugins.jar
230233
share/ozone/lib/ozone-multitenancy-ranger.jar
231234
share/ozone/lib/ozone-reconcodegen.jar
232235
share/ozone/lib/ozone-recon.jar

hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,17 @@ ListSnapshotResponse listSnapshot(
345345
List<OmVolumeArgs> listVolumes(String userName, String prefix,
346346
String startKey, int maxKeys) throws IOException;
347347

348+
/**
349+
* Returns a list of operation info objects.
350+
*
351+
* @param startKey the start key determines where to start listing
352+
* from, this key is excluded from the result.
353+
* @param maxResults the maximum number of results to return.
354+
* @return a list of {@link OmCompletedRequestInfo}
355+
* @throws IOException
356+
*/
357+
List<OmCompletedRequestInfo> listCompletedRequestInfo(String startKey, int maxResults) throws IOException;
358+
348359
/**
349360
* Returns the names of up to {@code count} open keys whose age is
350361
* greater than or equal to {@code expireThreshold}.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License. See accompanying LICENSE file.
14+
-->
15+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
16+
<modelVersion>4.0.0</modelVersion>
17+
<parent>
18+
<groupId>org.apache.ozone</groupId>
19+
<artifactId>ozone</artifactId>
20+
<version>2.2.0-SNAPSHOT</version>
21+
</parent>
22+
<artifactId>ozone-manager-plugins</artifactId>
23+
<version>2.2.0-SNAPSHOT</version>
24+
<packaging>jar</packaging>
25+
<name>Apache Ozone Manager Plugins</name>
26+
<properties>
27+
<classpath.skip>false</classpath.skip>
28+
<file.encoding>UTF-8</file.encoding>
29+
</properties>
30+
31+
<dependencies>
32+
<dependency>
33+
<groupId>com.google.guava</groupId>
34+
<artifactId>guava</artifactId>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.hadoop</groupId>
38+
<artifactId>hadoop-common</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.apache.kafka</groupId>
42+
<artifactId>kafka-clients</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.ozone</groupId>
46+
<artifactId>hdds-common</artifactId>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.ozone</groupId>
50+
<artifactId>hdds-server-framework</artifactId>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.apache.ozone</groupId>
54+
<artifactId>ozone-common</artifactId>
55+
</dependency>
56+
<dependency>
57+
<groupId>org.apache.ozone</groupId>
58+
<artifactId>ozone-interface-client</artifactId>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.slf4j</groupId>
62+
<artifactId>slf4j-api</artifactId>
63+
</dependency>
64+
</dependencies>
65+
66+
<build>
67+
<plugins>
68+
<plugin>
69+
<groupId>org.apache.maven.plugins</groupId>
70+
<artifactId>maven-compiler-plugin</artifactId>
71+
<configuration>
72+
<proc>none</proc>
73+
</configuration>
74+
</plugin>
75+
</plugins>
76+
</build>
77+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.ozone.om.eventlistener;
19+
20+
import java.io.IOException;
21+
import java.util.Collections;
22+
import java.util.Map;
23+
import java.util.Properties;
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
26+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
27+
import org.apache.kafka.clients.admin.AdminClient;
28+
import org.apache.kafka.clients.admin.NewTopic;
29+
import org.apache.kafka.clients.producer.KafkaProducer;
30+
import org.apache.kafka.clients.producer.ProducerRecord;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* This is an implementation of OMEventListener which uses the
36+
* OMEventListenerLedgerPoller as a building block to periodically poll/consume
37+
* completed operations, serialize them to a S3 schema and produce them
38+
* to a kafka topic.
39+
*/
40+
public class OMEventListenerKafkaPublisher implements OMEventListener {
41+
public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class);
42+
43+
private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka.";
44+
private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1;
45+
46+
private OMEventListenerLedgerPoller ledgerPoller;
47+
private KafkaClientWrapper kafkaClient;
48+
private OMEventListenerLedgerPollerSeekPosition seekPosition;
49+
50+
@Override
51+
public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) {
52+
Map<String, String> kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX);
53+
Properties kafkaProps = new Properties();
54+
kafkaProps.putAll(kafkaPropsMap);
55+
56+
this.kafkaClient = new KafkaClientWrapper(kafkaProps);
57+
58+
// TODO: these constants should be read from config
59+
long kafkaServiceInterval = 2 * 1000;
60+
long kafkaServiceTimeout = 300 * 1000;
61+
62+
LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," +
63+
"serviceTimeout={}, kafkaProps={}, seekPosition={}",
64+
kafkaServiceInterval, kafkaServiceTimeout, kafkaProps,
65+
seekPosition);
66+
67+
this.seekPosition = new OMEventListenerLedgerPollerSeekPosition();
68+
69+
this.ledgerPoller = new OMEventListenerLedgerPoller(
70+
kafkaServiceInterval, TimeUnit.MILLISECONDS,
71+
COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE,
72+
kafkaServiceTimeout, pluginContext, conf,
73+
seekPosition,
74+
this::handleCompletedRequest);
75+
}
76+
77+
@Override
78+
public void start() {
79+
ledgerPoller.start();
80+
81+
try {
82+
kafkaClient.initialize();
83+
} catch (IOException ex) {
84+
LOG.error("Failure initializing kafka client", ex);
85+
}
86+
}
87+
88+
@Override
89+
public void shutdown() {
90+
try {
91+
kafkaClient.shutdown();
92+
} catch (IOException ex) {
93+
LOG.error("Failure shutting down kafka client", ex);
94+
}
95+
96+
ledgerPoller.shutdown();
97+
}
98+
99+
// callback called by OMEventListenerLedgerPoller
100+
public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) {
101+
LOG.info("Processing {}", completedRequestInfo);
102+
103+
// stub event until we implement a strategy to convert the events to
104+
// a user facing schema (e.g. S3)
105+
String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}",
106+
completedRequestInfo.getVolumeName(),
107+
completedRequestInfo.getBucketName(),
108+
completedRequestInfo.getKeyName(),
109+
String.valueOf(completedRequestInfo.getCmdType()));
110+
111+
LOG.info("Sending {}", event);
112+
113+
try {
114+
kafkaClient.send(event);
115+
} catch (IOException ex) {
116+
LOG.error("Failure to send event {}", event, ex);
117+
return;
118+
}
119+
120+
// we can update the seek position
121+
seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex()));
122+
}
123+
124+
static class KafkaClientWrapper {
125+
public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class);
126+
127+
private final String topic;
128+
private final Properties kafkaProps;
129+
130+
private KafkaProducer<String, String> producer;
131+
132+
KafkaClientWrapper(Properties kafkaProps) {
133+
this.topic = (String) kafkaProps.get("topic");
134+
this.kafkaProps = kafkaProps;
135+
}
136+
137+
public void initialize() throws IOException {
138+
LOG.info("Initializing with properties {}", kafkaProps);
139+
this.producer = new KafkaProducer<>(kafkaProps);
140+
141+
ensureTopicExists();
142+
}
143+
144+
public void shutdown() throws IOException {
145+
producer.close();
146+
}
147+
148+
public void send(String message) throws IOException {
149+
if (producer != null) {
150+
LOG.info("Producing event {}", message);
151+
ProducerRecord<String, String> producerRecord =
152+
new ProducerRecord<>(topic, message);
153+
producer.send(producerRecord);
154+
} else {
155+
LOG.warn("Producing event {} [KAFKA DOWN]", message);
156+
}
157+
}
158+
159+
private void ensureTopicExists() {
160+
try (AdminClient adminClient = AdminClient.create(kafkaProps)) {
161+
LOG.info("Creating kafka topic: {}", this.topic);
162+
NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1);
163+
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
164+
adminClient.close();
165+
} catch (Exception ex) {
166+
LOG.error("Failed to create topic: {}", this.topic, ex);
167+
}
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)