@@ -977,11 +977,16 @@ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig tableConf
977977 startOffset );
978978 }
979979
980- private String computeStartOffset (
981- String nextOffset , StreamConfig streamConfig , int partitionId ) {
980+ private String computeStartOffset (String nextOffset , StreamConfig streamConfig , int partitionId ) {
981+ if (!streamConfig .isEnableOffsetAutoReset ()) {
982+ return nextOffset ;
983+ }
982984 long timeThreshold = streamConfig .getOffsetAutoResetTimeSecThreshold ();
983985 int offsetThreshold = streamConfig .getOffsetAutoResetOffsetThreshold ();
984986 if (timeThreshold <= 0 && offsetThreshold <= 0 ) {
987+ LOGGER .warn ("Invalid offset auto reset configuration for table: {}, topic: {}. "
988+ + "timeThreshold: {}, offsetThreshold: {}" ,
989+ streamConfig .getTableNameWithType (), streamConfig .getTopicName (), timeThreshold , offsetThreshold );
985990 return nextOffset ;
986991 }
987992 String clientId =
@@ -990,40 +995,38 @@ private String computeStartOffset(
990995 StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider .create (streamConfig );
991996 StreamPartitionMsgOffset offsetAtSLA ;
992997 StreamPartitionMsgOffset latestOffset ;
993- try (StreamMetadataProvider metadataProvider = consumerFactory .createPartitionMetadataProvider (
994- clientId , partitionId )) {
998+ try (StreamMetadataProvider metadataProvider = consumerFactory .createPartitionMetadataProvider (clientId ,
999+ partitionId )) {
9951000 // Fetching timestamp from an offset is an expensive operation which requires reading the data,
9961001 // while fetching offset from timestamp is lightweight and only needs to read metadata.
9971002 // Hence, instead of checking if latestOffset's time - nextOffset's time < SLA, we would check
9981003 // (CurrentTime - SLA)'s offset > nextOffset.
9991004 // TODO: it is relying on System.currentTimeMillis() which might be affected by time drift. If we are able to
10001005 // get nextOffset's time, we should instead check (nextOffset's time + SLA)'s offset < latestOffset
10011006 latestOffset = metadataProvider .fetchStreamPartitionOffset (OffsetCriteria .LARGEST_OFFSET_CRITERIA , 5000 );
1002- LOGGER .info ("Latest offset of topic {} and partition {} is {}" ,
1003- streamConfig . getTopicName (), partitionId , latestOffset );
1004- offsetAtSLA = metadataProvider . getOffsetAtTimestamp (
1005- partitionId , System .currentTimeMillis () - timeThreshold * 1000 );
1006- LOGGER .info ("Offset at SLA of topic {} and partition {} is {}" ,
1007- streamConfig . getTopicName (), partitionId , offsetAtSLA );
1007+ LOGGER .info ("Latest offset of topic {} and partition {} is {}" , streamConfig . getTopicName (), partitionId ,
1008+ latestOffset );
1009+ offsetAtSLA =
1010+ metadataProvider . getOffsetAtTimestamp ( partitionId , System .currentTimeMillis () - timeThreshold * 1000 );
1011+ LOGGER .info ("Offset at SLA of topic {} and partition {} is {}" , streamConfig . getTopicName (), partitionId ,
1012+ offsetAtSLA );
10081013 } catch (Exception e ) {
10091014 LOGGER .warn ("Not able to fetch the offset metadata, skip auto resetting offsets" , e );
10101015 return nextOffset ;
10111016 }
10121017 try {
1013- if (timeThreshold > 0 && offsetAtSLA != null
1014- && Long .valueOf (offsetAtSLA .toString ()) > Long .valueOf (nextOffset )) {
1015- LOGGER .info ("Auto reset offset from {} to {} on partition {} because time threshold reached" ,
1016- nextOffset , latestOffset , partitionId );
1018+ if (timeThreshold > 0 && offsetAtSLA != null && Long .valueOf (offsetAtSLA .toString ()) > Long .valueOf (nextOffset )) {
1019+ LOGGER .info ("Auto reset offset from {} to {} on partition {} because time threshold reached" , nextOffset ,
1020+ latestOffset , partitionId );
10171021 return latestOffset .toString ();
10181022 }
1019- if (offsetThreshold > 0
1020- && Long .valueOf (latestOffset .toString ()) - Long .valueOf (nextOffset ) > offsetThreshold ) {
1023+ if (offsetThreshold > 0 && Long .valueOf (latestOffset .toString ()) - Long .valueOf (nextOffset ) > offsetThreshold ) {
10211024 LOGGER .info ("Auto reset offset from {} to {} on partition {} because number of offsets threshold reached" ,
10221025 nextOffset , latestOffset , partitionId );
10231026 return latestOffset .toString ();
10241027 }
10251028 } catch (Exception e ) {
1026- LOGGER .warn ("Not able to convert the offset to LONG type , skip auto resetting offsets" , e );
1029+ LOGGER .warn ("Not able to compare the offsets , skip auto resetting offsets" , e );
10271030 }
10281031 return nextOffset ;
10291032 }
0 commit comments