Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public class PinotLLCRealtimeSegmentManager {

// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
// Timeout for calling stream metadata provider APIs
private static final long STREAM_FETCH_TIMEOUT_MS = 5_000L;

// TODO: make this configurable with default set to 10
/**
Expand Down Expand Up @@ -931,7 +933,11 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st
int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
String startOffset = committingSegmentDescriptor.getNextOffset();

// Handle offset auto reset
String nextOffset = committingSegmentDescriptor.getNextOffset();
String startOffset = computeStartOffset(
nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());

LOGGER.info(
"Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}",
Expand Down Expand Up @@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}

private String computeStartOffset(String nextOffset, StreamConfig streamConfig, int partitionId) {
if (!streamConfig.isEnableOffsetAutoReset()) {
return nextOffset;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we add a log here of skipping this ?
A change in streamConfig is not recorded and debugging old segment commits might get hard.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is enabled, the log

LOGGER.info("Latest offset of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId, latestOffset);

will be printed.
And right after this method,

LOGGER.info(
        "Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", segmentName, startOffset, creationTimeMs);

would be printed.
With the help of those 2 logs, we should be able to understand if autoReset is enabled or not.

}
long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
if (timeThreshold <= 0 && offsetThreshold <= 0) {
LOGGER.warn("Invalid offset auto reset configuration for table: {}, topic: {}. "
+ "timeThreshold: {}, offsetThreshold: {}",
streamConfig.getTableNameWithType(), streamConfig.getTopicName(), timeThreshold, offsetThreshold);
return nextOffset;
}
String clientId = getTableTopicUniqueClientId(streamConfig);
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to a separate function getLargestOffset

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a util similar to

  @VisibleForTesting
  Set<Integer> getPartitionIds(StreamConfig streamConfig)
      throws Exception {
    String clientId = StreamConsumerFactory.getUniqueClientId(
        PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
            + streamConfig.getTopicName());
    StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
    try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
      return metadataProvider.fetchPartitionIds(5000L);
    }
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a separate method computeStartOffset, and we need to construct the consumer to get 2 offsets, "largest", "offsetAtSLA". If we want to extract, then that method needs to return a Pair of StreamPartitionMsgOffset or a wrapper class of 2. Feels it would be a overkill. It would be rare to get those 2 offsets in other usages.

StreamPartitionMsgOffsetFactory offsetFactory = consumerFactory.createStreamMsgOffsetFactory();
StreamPartitionMsgOffset nextOffsetWithType = offsetFactory.create(nextOffset);
StreamPartitionMsgOffset offsetAtSLA = null;
StreamPartitionMsgOffset latestOffset;
try (StreamMetadataProvider metadataProvider = consumerFactory.createPartitionMetadataProvider(clientId,
partitionId)) {
// Fetching timestamp from an offset is an expensive operation which requires reading the data,
// while fetching offset from timestamp is lightweight and only needs to read metadata.
// Hence, instead of checking if latestOffset's time - nextOffset's time < SLA, we would check
// (CurrentTime - SLA)'s offset > nextOffset.
// TODO: it is relying on System.currentTimeMillis() which might be affected by time drift. If we are able to
// get nextOffset's time, we should instead check (nextOffset's time + SLA)'s offset < latestOffset
latestOffset = metadataProvider.fetchStreamPartitionOffset(
OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
LOGGER.info("Latest offset of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId,
latestOffset);
if (timeThreshold > 0) {
offsetAtSLA =
metadataProvider.getOffsetAtTimestamp(partitionId, System.currentTimeMillis() - timeThreshold * 1000,
STREAM_FETCH_TIMEOUT_MS);
LOGGER.info("Offset at SLA of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId,
offsetAtSLA);
}
} catch (Exception e) {
LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting offsets", e);
return nextOffset;
}
try {
if (timeThreshold > 0 && offsetAtSLA != null && offsetAtSLA.compareTo(nextOffsetWithType) < 0) {
LOGGER.info("Auto reset offset from {} to {} on partition {} because time threshold reached", nextOffset,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to log as warning given this can cause data loss. Same for line 1020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the reset happens, users must have already set this up through the config explicitly. Then the happening of the reset is an "expected" behavior which I feel may not be good to be logged as warnings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this is "expected", it is definitely worth noting. We should also consider emitting a metric when offset reset happens because it means data loss

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, metrics is in plan. That part would be added afterwards.

latestOffset, partitionId);
return latestOffset.toString();
}
if (offsetThreshold > 0
&& Long.parseLong(latestOffset.toString()) - Long.parseLong(nextOffset) > offsetThreshold) {
LOGGER.info("Auto reset offset from {} to {} on partition {} because number of offsets threshold reached",
nextOffset, latestOffset, partitionId);
return latestOffset.toString();
}
} catch (Exception e) {
LOGGER.warn("Not able to compare the offsets, skip auto resetting offsets", e);
}
return nextOffset;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Fetches the latest offset from the stream for a given partition.
* Extracted as a separate method for better testability and reusability.
*/
@VisibleForTesting
String fetchLatestOffset(StreamConfig streamConfig, int partitionId) throws Exception {
String clientId = StreamConsumerFactory.getUniqueClientId(
String.format("%s-%s-%s-latest-offset",
PinotLLCRealtimeSegmentManager.class.getSimpleName(),
streamConfig.getTableNameWithType(),
streamConfig.getTopicName())
);
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
StreamPartitionMsgOffset latestOffset = metadataProvider.fetchStreamPartitionOffset(
OffsetCriteria.LARGEST_OFFSET_CRITERIA,
STREAM_FETCH_TIMEOUT_MS
);
return latestOffset.toString();
}
}

Adapt as per need.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable
private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId,
int numPartitionGroups) {
Expand Down Expand Up @@ -1010,16 +1075,20 @@ public long getCommitTimeoutMS(String realtimeTableName) {
return commitTimeoutMS;
}

private String getTableTopicUniqueClientId(StreamConfig streamConfig) {
return StreamConsumerFactory.getUniqueClientId(
PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
+ streamConfig.getTopicName());
}

/**
* Fetches the partition ids for the stream. Some stream (e.g. Kinesis) might not support this operation, in which
* case exception will be thrown.
*/
@VisibleForTesting
Set<Integer> getPartitionIds(StreamConfig streamConfig)
throws Exception {
String clientId = StreamConsumerFactory.getUniqueClientId(
PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
+ streamConfig.getTopicName());
String clientId = getTableTopicUniqueClientId(streamConfig);
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
return metadataProvider.fetchPartitionIds(5000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,15 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
Expand All @@ -91,6 +97,7 @@
import org.apache.pinot.util.TestUtils;
import org.apache.zookeeper.data.Stat;
import org.joda.time.Interval;
import org.mockito.MockedStatic;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -122,6 +129,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
static final String CRC = Long.toString(RANDOM.nextLong() & 0xFFFFFFFFL);
static final SegmentVersion SEGMENT_VERSION = RANDOM.nextBoolean() ? SegmentVersion.v1 : SegmentVersion.v3;
static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 + NUM_DOCS;
static final int SEGMENT_SIZE_IN_BYTES = 100000000;
@AfterClass
public void tearDown()
Expand Down Expand Up @@ -325,6 +333,139 @@ public void testCommitSegment() {
assertNull(consumingSegmentZKMetadata);
}

@Test
public void testCommitSegmentWithOffsetAutoResetOnOffset()
throws Exception {
// Set up a new table with 2 replicas, 5 instances, 4 partition
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
setUpNewTable(segmentManager, 2, 5, 4);
Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, String.valueOf(true));
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY, "100");
segmentManager.makeTableConfig(streamConfigMap);

StreamConsumerFactory mockConsumerFactory = mock(StreamConsumerFactory.class);
StreamMetadataProvider mockMetadataProvider = mock(StreamMetadataProvider.class);
when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), anyInt())).thenReturn(mockMetadataProvider);
when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new LongMsgOffsetFactory());
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), anyLong())).thenReturn(PARTITION_OFFSET);

try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider = mockStatic(
StreamConsumerFactoryProvider.class)) {

mockedStaticProvider.when(() -> StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
.thenReturn(mockConsumerFactory);

// Commit a segment for partition group 0
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString();
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(committingSegment, endOffset, SEGMENT_SIZE_IN_BYTES);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);

// Verify instance states for committed segment and new consuming segment
Map<String, String> committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
assertNotNull(committedSegmentInstanceStateMap);
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
Collections.singleton(SegmentStateModel.ONLINE));

String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
Collections.singleton(SegmentStateModel.CONSUMING));

// Verify segment ZK metadata for committed segment and new consuming segment
SegmentZKMetadata committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment);
assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
assertEquals(committedSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString());
assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name());
assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
assertEquals(committedSegmentZKMetadata.getSizeInBytes(), SEGMENT_SIZE_IN_BYTES);

SegmentZKMetadata consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment);
assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
assertEquals(consumingSegmentZKMetadata.getStartOffset(), String.valueOf(LATEST_OFFSET));
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
}
}

@Test
public void testCommitSegmentWithOffsetAutoResetOnTime()
throws Exception {
// Set up a new table with 2 replicas, 5 instances, 4 partition
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
setUpNewTable(segmentManager, 2, 5, 4);
Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, String.valueOf(true));
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY, "1800");
segmentManager.makeTableConfig(streamConfigMap);

StreamConsumerFactory mockConsumerFactory = mock(StreamConsumerFactory.class);
StreamMetadataProvider mockMetadataProvider = mock(StreamMetadataProvider.class);
when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), anyInt())).thenReturn(mockMetadataProvider);
when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new LongMsgOffsetFactory());
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), anyLong())).thenReturn(
new LongMsgOffset(PARTITION_OFFSET.getOffset() + 1L));

try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider = mockStatic(
StreamConsumerFactoryProvider.class)) {

mockedStaticProvider.when(() -> StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
.thenReturn(mockConsumerFactory);

// Commit a segment for partition group 0
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString();
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(committingSegment, endOffset, SEGMENT_SIZE_IN_BYTES);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);

// Verify instance states for committed segment and new consuming segment
Map<String, String> committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
assertNotNull(committedSegmentInstanceStateMap);
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
Collections.singleton(SegmentStateModel.ONLINE));

String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
Collections.singleton(SegmentStateModel.CONSUMING));

// Verify segment ZK metadata for committed segment and new consuming segment
SegmentZKMetadata committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment);
assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
assertEquals(committedSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString());
assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name());
assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
assertEquals(committedSegmentZKMetadata.getSizeInBytes(), SEGMENT_SIZE_IN_BYTES);

SegmentZKMetadata consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment);
assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
assertEquals(consumingSegmentZKMetadata.getStartOffset(), String.valueOf(LATEST_OFFSET));
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
}
}

/**
* Test cases for the scenario where stream partitions increase, and the validation manager is attempting to create
* segments for new partitions. This test assumes that all other factors remain the same (no error conditions or
Expand Down Expand Up @@ -1725,6 +1866,13 @@ void makeTableConfig() {
_streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
}

void makeTableConfig(Map<String, String> streamConfigMap) {
_tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas)
.setStreamConfigs(streamConfigMap).build();
_streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
}

void makeConsumingInstancePartitions() {
List<String> instances = new ArrayList<>(_numInstances);
for (int i = 0; i < _numInstances; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ public List<TopicMetadata> getTopics() {
}
}

@Override
public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis, long timeoutMillis) {
return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis),
Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
}

public static class KafkaTopicMetadata implements TopicMetadata {
private String _name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ public KafkaTopicMetadata setName(String name) {
return this;
}
}

@Override
public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis, long timeoutMillis) {
return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis),
Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
}

@Override
public void close()
throws IOException {
Expand Down
Loading
Loading