3333import org .apache .commons .configuration2 .PropertiesConfiguration ;
3434import org .apache .commons .io .FileUtils ;
3535import org .apache .pinot .segment .local .io .util .PinotDataBitSet ;
36- import org .apache .pinot .segment .local .segment .creator .impl .BaseSegmentCreator ;
3736import org .apache .pinot .segment .local .segment .creator .impl .SegmentDictionaryCreator ;
3837import org .apache .pinot .segment .local .segment .creator .impl .fwd .MultiValueVarByteRawIndexCreator ;
3938import org .apache .pinot .segment .local .segment .creator .impl .stats .AbstractColumnStatisticsCollector ;
@@ -397,16 +396,23 @@ private boolean shouldChangeRawCompressionType(String column, SegmentDirectory.R
397396 // Get the new compression type.
398397 ForwardIndexConfig forwardIndexConfig = _fieldIndexConfigs .get (column ).getConfig (StandardIndexes .forward ());
399398 ChunkCompressionType newCompressionType = forwardIndexConfig .getChunkCompressionType ();
400- if (forwardIndexConfig .getCompressionCodecSpec () != null ) {
401- String newCompressionCodecSpec = forwardIndexConfig .getCompressionCodecSpec ().toConfigString ();
402- String existingCompressionCodecSpec =
403- metadataProperties .getString (getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC ), null );
404- if (existingCompressionCodecSpec != null ) {
405- return !existingCompressionCodecSpec .equals (newCompressionCodecSpec );
399+ String existingCompressionCodecSpec =
400+ metadataProperties .getString (getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC ), null );
401+ if (forwardIndexConfig .getCompressionCodecSpec () != null
402+ && forwardIndexConfig .getCompressionCodecSpec ().hasLevel ()) {
403+ // Segments written before explicit codec metadata was introduced have an unknown level. Rewrite them when the
404+ // table config now specifies an explicit level so the persisted forward index matches the configured codec spec.
405+ if (existingCompressionCodecSpec == null ) {
406+ return true ;
406407 }
407- if (forwardIndexConfig .getCompressionCodecSpec ().hasLevel ()) {
408+ String newCompressionCodecSpec = forwardIndexConfig .getCompressionCodecSpec ().toConfigString ();
409+ if (!newCompressionCodecSpec .equals (existingCompressionCodecSpec )) {
408410 return true ;
409411 }
412+ } else if (existingCompressionCodecSpec != null ) {
413+ // Persisted codec specs are only used for explicit levels. If an existing segment has one and the new
414+ // configuration does not, rewrite the forward index to restore the plain codec behavior.
415+ return true ;
410416 }
411417
412418 // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
@@ -476,12 +482,8 @@ private void rewriteForwardIndexForCompressionChange(String column, SegmentDirec
476482 // called during segmentWriter.close().
477483 segmentWriter .removeIndex (column , StandardIndexes .forward ());
478484 LoaderUtils .writeIndexToV3Format (segmentWriter , column , fwdIndexFile , StandardIndexes .forward ());
479- Map <String , String > metadataProperties = new HashMap <>();
480- BaseSegmentCreator .addForwardIndexCompressionCodecInfo (metadataProperties , column ,
485+ updateForwardIndexCompressionCodecMetadata (column ,
481486 _fieldIndexConfigs .get (column ).getConfig (StandardIndexes .forward ()), hasDictionary );
482- if (!metadataProperties .isEmpty ()) {
483- SegmentMetadataUtils .updateMetadataProperties (_segmentDirectory , metadataProperties );
484- }
485487
486488 // Delete the marker file.
487489 FileUtils .deleteQuietly (inProgress );
@@ -919,6 +921,7 @@ private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer
919921 metadataProperties .put (getKeyFor (column , CARDINALITY ), String .valueOf (cardinality ));
920922 metadataProperties .put (getKeyFor (column , BITS_PER_ELEMENT ),
921923 String .valueOf (PinotDataBitSet .getNumBitsPerValue (cardinality - 1 )));
924+ metadataProperties .put (getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC ), null );
922925 SegmentMetadataUtils .updateMetadataProperties (_segmentDirectory , metadataProperties );
923926
924927 // We remove indexes that have to be rewritten when a dictEnabled is toggled. Note that the respective index
@@ -971,14 +974,17 @@ private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDir
971974
972975 LOGGER .info ("Created raw forwardIndex. Updating metadata properties for segment={} and column={}" , segmentName ,
973976 column );
974- Map <String , String > metadataProperties = new HashMap <>();
975- metadataProperties .put (getKeyFor (column , HAS_DICTIONARY ), String .valueOf (false ));
976- metadataProperties .put (getKeyFor (column , DICTIONARY_ELEMENT_SIZE ), String .valueOf (0 ));
977- BaseSegmentCreator .addForwardIndexCompressionCodecInfo (metadataProperties , column ,
977+ PropertiesConfiguration metadataProperties =
978+ SegmentMetadataUtils .getPropertiesConfiguration (_segmentDirectory .getSegmentMetadata ());
979+ metadataProperties .setProperty (getKeyFor (column , HAS_DICTIONARY ), String .valueOf (false ));
980+ metadataProperties .setProperty (getKeyFor (column , DICTIONARY_ELEMENT_SIZE ), String .valueOf (0 ));
981+ updateForwardIndexCompressionCodecMetadata (metadataProperties , column ,
978982 _fieldIndexConfigs .get (column ).getConfig (StandardIndexes .forward ()), false );
979983 // TODO: See https://github.com/apache/pinot/pull/16921 for details
980- // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(-1));
981- SegmentMetadataUtils .updateMetadataProperties (_segmentDirectory , metadataProperties );
984+ // metadataProperties.setProperty(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(-1));
985+ SegmentMetadataUtils .savePropertiesConfiguration (metadataProperties , _segmentDirectory .getSegmentMetadata ()
986+ .getIndexDir ());
987+ _segmentDirectory .reloadMetadata ();
982988
983989 // Remove range index, inverted index and FST index.
984990 removeDictRelatedIndexes (column , segmentWriter );
@@ -989,6 +995,28 @@ private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDir
989995 LOGGER .info ("Created raw based forward index for segment: {}, column: {}" , segmentName , column );
990996 }
991997
998+ private void updateForwardIndexCompressionCodecMetadata (String column ,
999+ @ Nullable ForwardIndexConfig forwardIndexConfig , boolean hasDictionary )
1000+ throws Exception {
1001+ PropertiesConfiguration metadataProperties =
1002+ SegmentMetadataUtils .getPropertiesConfiguration (_segmentDirectory .getSegmentMetadata ());
1003+ updateForwardIndexCompressionCodecMetadata (metadataProperties , column , forwardIndexConfig , hasDictionary );
1004+ SegmentMetadataUtils .savePropertiesConfiguration (metadataProperties , _segmentDirectory .getSegmentMetadata ()
1005+ .getIndexDir ());
1006+ _segmentDirectory .reloadMetadata ();
1007+ }
1008+
1009+ private static void updateForwardIndexCompressionCodecMetadata (PropertiesConfiguration metadataProperties ,
1010+ String column , @ Nullable ForwardIndexConfig forwardIndexConfig , boolean hasDictionary ) {
1011+ String metadataKey = getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC );
1012+ if (!hasDictionary && forwardIndexConfig != null && forwardIndexConfig .getCompressionCodecSpec () != null
1013+ && forwardIndexConfig .getCompressionCodecSpec ().hasLevel ()) {
1014+ metadataProperties .setProperty (metadataKey , forwardIndexConfig .getCompressionCodecSpec ().toConfigString ());
1015+ } else {
1016+ metadataProperties .clearProperty (metadataKey );
1017+ }
1018+ }
1019+
9921020 private void rewriteDictToRawForwardIndex (ColumnMetadata columnMetadata , SegmentDirectory .Writer segmentWriter ,
9931021 File indexDir )
9941022 throws Exception {
0 commit comments