Skip to content

Commit 749e20a

Browse files
author
Colm Dougan
committed
HDDS-14006. Address Copilot review comments
Also make the seek position string be deserialized from a string to a long earlier in the process so it doesn't have to be deserialized in multiple places and the DB list calls can just take a Long as the seek position
1 parent 4bce19b commit 749e20a

7 files changed

Lines changed: 22 additions & 17 deletions

File tree

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public interface OMEventListenerPluginContext {
3131

3232
// TODO: should we allow plugins to pass in maxResults or just limit
3333
// them to some predefined value for safety? e.g. 10K
34-
List<OmCompletedRequestInfo> listCompletedRequestInfo(String startKey, int maxResults) throws IOException;
34+
List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int maxResults) throws IOException;
3535

3636
// XXX: this probably doesn't belong here
3737
String getThreadNamePrefix();

hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ List<OmVolumeArgs> listVolumes(String userName, String prefix,
354354
* @return a list of {@link OmCompletedRequestInfo}
355355
* @throws IOException
356356
*/
357-
List<OmCompletedRequestInfo> listCompletedRequestInfo(String startKey, int maxResults) throws IOException;
357+
List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int maxResults) throws IOException;
358358

359359
/**
360360
* Returns the names of up to {@code count} open keys whose age is

hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext plu
6060
long kafkaServiceTimeout = 300 * 1000;
6161

6262
LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," +
63-
"serviceTimeout={}, kafkaProps={}, seekPosition={}",
64-
kafkaServiceInterval, kafkaServiceTimeout, kafkaProps,
63+
"serviceTimeout={}, seekPosition={}",
64+
kafkaServiceInterval, kafkaServiceTimeout,
6565
seekPosition);
6666

6767
this.seekPosition = new OMEventListenerLedgerPollerSeekPosition();
@@ -135,14 +135,16 @@ static class KafkaClientWrapper {
135135
}
136136

137137
public void initialize() throws IOException {
138-
LOG.info("Initializing with properties {}", kafkaProps);
138+
LOG.info("Initializing kafka client for topic {}", topic);
139139
this.producer = new KafkaProducer<>(kafkaProps);
140140

141141
ensureTopicExists();
142142
}
143143

144144
public void shutdown() throws IOException {
145-
producer.close();
145+
if (producer != null) {
146+
producer.close();
147+
}
146148
}
147149

148150
public void send(String message) throws IOException {

hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.atomic.AtomicBoolean;
2424
import java.util.concurrent.atomic.AtomicLong;
2525
import java.util.function.Consumer;
26+
import org.apache.commons.lang3.StringUtils;
2627
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
2728
import org.apache.hadoop.hdds.utils.BackgroundService;
2829
import org.apache.hadoop.hdds.utils.BackgroundTask;
@@ -59,7 +60,7 @@ public OMEventListenerLedgerPoller(long interval, TimeUnit unit,
5960

6061
super("OMEventListenerLedgerPoller",
6162
interval,
62-
TimeUnit.MILLISECONDS,
63+
unit,
6364
poolSize,
6465
serviceTimeout, pluginContext.getThreadNamePrefix());
6566

@@ -116,13 +117,15 @@ public BackgroundTaskResult call() {
116117
LOG.debug("Running OMEventListenerLedgerPoller");
117118
}
118119
if (runCount.get() == 0) {
119-
seekPosition.initSeekPosition();
120+
seekPosition.set(seekPosition.initSeekPosition());
120121
}
121122
getRunCount().incrementAndGet();
122123

123124
try {
125+
String startKeyStr = seekPosition.get();
126+
Long startKey = StringUtils.isNotBlank(startKeyStr) ? Long.valueOf(startKeyStr) : null;
124127
for (OmCompletedRequestInfo requestInfo : pluginContext.listCompletedRequestInfo(
125-
seekPosition.get(), MAX_RESULTS)) {
128+
startKey, MAX_RESULTS)) {
126129
callback.accept(requestInfo);
127130
}
128131
successRunCount.incrementAndGet();

hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.mockito.junit.jupiter.MockitoExtension;
3939

4040
/**
41-
* Tests {@link OMEventListenerPluginManager}.
41+
* Tests {@link OMEventListenerKafkaPublisher}.
4242
*/
4343
@ExtendWith(MockitoExtension.class)
4444
public class TestOMEventListenerKafkaPublisher {
@@ -85,13 +85,13 @@ private List<String> captureEventsProducedByOperation(OmCompletedRequestInfo op,
8585
List<String> events = new ArrayList<>();
8686

8787
OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher();
88-
try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockeKafkaClientWrapper =
88+
try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> mockedKafkaClientWrapper =
8989
mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) {
9090

9191
plugin.initialize(conf, pluginContext);
9292
plugin.handleCompletedRequest(op);
9393

94-
OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0);
94+
OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockedKafkaClientWrapper.constructed().get(0);
9595
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
9696
verify(mock, times(expectEvents)).send(argument.capture());
9797

@@ -144,7 +144,7 @@ public void testCreateDirectoryRequestProducesS3CreatedEvent() throws Interrupte
144144
}
145145

146146
@Test
147-
public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException {
147+
public void testRenameRequestProducesRenameKeyEvent() throws InterruptedException, IOException {
148148
OmCompletedRequestInfo renameRequest = buildCompletedRequestInfo(4L, Type.RenameKey, "some/key4",
149149
new OperationArgs.RenameKeyArgs("some/key_RENAMED"));
150150

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ private List<OmVolumeArgs> listAllVolumes(String prefix, String startKey,
13441344
* {@inheritDoc}
13451345
*/
13461346
@Override
1347-
public List<OmCompletedRequestInfo> listCompletedRequestInfo(final String startKey,
1347+
public List<OmCompletedRequestInfo> listCompletedRequestInfo(final Long startKey,
13481348
final int maxResults)
13491349
throws IOException {
13501350
List<OmCompletedRequestInfo> results = new ArrayList<>();
@@ -1354,11 +1354,11 @@ public List<OmCompletedRequestInfo> listCompletedRequestInfo(final String startK
13541354
tableIterator = getCompletedRequestInfoTable().iterator()) {
13551355

13561356
boolean skipFirst = false;
1357-
if (StringUtils.isNotBlank(startKey)) {
1357+
if (startKey != null) {
13581358
// TODO: what happens if the seek position is no longer
13591359
// available? Do we go to the end of the list
13601360
// or the first key > startKey
1361-
tableIterator.seek(Long.valueOf(startKey));
1361+
tableIterator.seek(startKey);
13621362
skipFirst = true;
13631363
}
13641364

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public boolean isLeaderReady() {
4141
// TODO: should we allow plugins to pass in maxResults or just limit
4242
// them to some predefined value for safety? e.g. 10K
4343
@Override
44-
public List<OmCompletedRequestInfo> listCompletedRequestInfo(String startKey, int maxResults) throws IOException {
44+
public List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int maxResults) throws IOException {
4545
return ozoneManager.getMetadataManager().listCompletedRequestInfo(startKey, maxResults);
4646
}
4747

0 commit comments

Comments
 (0)