3333import org .apache .commons .configuration2 .PropertiesConfiguration ;
3434import org .apache .commons .io .FileUtils ;
3535import org .apache .pinot .segment .local .io .util .PinotDataBitSet ;
36- import org .apache .pinot .segment .local .segment .creator .impl .BaseSegmentCreator ;
3736import org .apache .pinot .segment .local .segment .creator .impl .SegmentDictionaryCreator ;
3837import org .apache .pinot .segment .local .segment .creator .impl .fwd .MultiValueVarByteRawIndexCreator ;
3938import org .apache .pinot .segment .local .segment .creator .impl .stats .AbstractColumnStatisticsCollector ;
@@ -397,16 +396,18 @@ private boolean shouldChangeRawCompressionType(String column, SegmentDirectory.R
397396 // Get the new compression type.
398397 ForwardIndexConfig forwardIndexConfig = _fieldIndexConfigs .get (column ).getConfig (StandardIndexes .forward ());
399398 ChunkCompressionType newCompressionType = forwardIndexConfig .getChunkCompressionType ();
400- if (forwardIndexConfig .getCompressionCodecSpec () != null ) {
399+ String existingCompressionCodecSpec =
400+ metadataProperties .getString (getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC ), null );
401+ if (forwardIndexConfig .getCompressionCodecSpec () != null
402+ && forwardIndexConfig .getCompressionCodecSpec ().hasLevel ()) {
401403 String newCompressionCodecSpec = forwardIndexConfig .getCompressionCodecSpec ().toConfigString ();
402- String existingCompressionCodecSpec =
403- metadataProperties .getString (getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC ), null );
404- if (existingCompressionCodecSpec != null ) {
405- return !existingCompressionCodecSpec .equals (newCompressionCodecSpec );
406- }
407- if (forwardIndexConfig .getCompressionCodecSpec ().hasLevel ()) {
404+ if (!newCompressionCodecSpec .equals (existingCompressionCodecSpec )) {
408405 return true ;
409406 }
407+ } else if (existingCompressionCodecSpec != null ) {
408+ // Persisted codec specs are only used for explicit levels. If an existing segment has one and the new
409+ // configuration does not, rewrite the forward index to restore the plain codec behavior.
410+ return true ;
410411 }
411412
412413 // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
@@ -476,12 +477,8 @@ private void rewriteForwardIndexForCompressionChange(String column, SegmentDirec
476477 // called during segmentWriter.close().
477478 segmentWriter .removeIndex (column , StandardIndexes .forward ());
478479 LoaderUtils .writeIndexToV3Format (segmentWriter , column , fwdIndexFile , StandardIndexes .forward ());
479- Map <String , String > metadataProperties = new HashMap <>();
480- BaseSegmentCreator .addForwardIndexCompressionCodecInfo (metadataProperties , column ,
480+ updateForwardIndexCompressionCodecMetadata (column ,
481481 _fieldIndexConfigs .get (column ).getConfig (StandardIndexes .forward ()), hasDictionary );
482- if (!metadataProperties .isEmpty ()) {
483- SegmentMetadataUtils .updateMetadataProperties (_segmentDirectory , metadataProperties );
484- }
485482
486483 // Delete the marker file.
487484 FileUtils .deleteQuietly (inProgress );
@@ -919,6 +916,7 @@ private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer
919916 metadataProperties .put (getKeyFor (column , CARDINALITY ), String .valueOf (cardinality ));
920917 metadataProperties .put (getKeyFor (column , BITS_PER_ELEMENT ),
921918 String .valueOf (PinotDataBitSet .getNumBitsPerValue (cardinality - 1 )));
919+ metadataProperties .put (getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC ), null );
922920 SegmentMetadataUtils .updateMetadataProperties (_segmentDirectory , metadataProperties );
923921
924922 // We remove indexes that have to be rewritten when a dictEnabled is toggled. Note that the respective index
@@ -971,14 +969,17 @@ private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDir
971969
972970 LOGGER .info ("Created raw forwardIndex. Updating metadata properties for segment={} and column={}" , segmentName ,
973971 column );
974- Map <String , String > metadataProperties = new HashMap <>();
975- metadataProperties .put (getKeyFor (column , HAS_DICTIONARY ), String .valueOf (false ));
976- metadataProperties .put (getKeyFor (column , DICTIONARY_ELEMENT_SIZE ), String .valueOf (0 ));
977- BaseSegmentCreator .addForwardIndexCompressionCodecInfo (metadataProperties , column ,
972+ PropertiesConfiguration metadataProperties =
973+ SegmentMetadataUtils .getPropertiesConfiguration (_segmentDirectory .getSegmentMetadata ());
974+ metadataProperties .setProperty (getKeyFor (column , HAS_DICTIONARY ), String .valueOf (false ));
975+ metadataProperties .setProperty (getKeyFor (column , DICTIONARY_ELEMENT_SIZE ), String .valueOf (0 ));
976+ updateForwardIndexCompressionCodecMetadata (metadataProperties , column ,
978977 _fieldIndexConfigs .get (column ).getConfig (StandardIndexes .forward ()), false );
979978 // TODO: See https://github.com/apache/pinot/pull/16921 for details
980- // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(-1));
981- SegmentMetadataUtils .updateMetadataProperties (_segmentDirectory , metadataProperties );
979+ // metadataProperties.setProperty(getKeyFor(column, BITS_PER_ELEMENT), String.valueOf(-1));
980+ SegmentMetadataUtils .savePropertiesConfiguration (metadataProperties , _segmentDirectory .getSegmentMetadata ()
981+ .getIndexDir ());
982+ _segmentDirectory .reloadMetadata ();
982983
983984 // Remove range index, inverted index and FST index.
984985 removeDictRelatedIndexes (column , segmentWriter );
@@ -989,6 +990,28 @@ private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDir
989990 LOGGER .info ("Created raw based forward index for segment: {}, column: {}" , segmentName , column );
990991 }
991992
993+ private void updateForwardIndexCompressionCodecMetadata (String column ,
994+ @ Nullable ForwardIndexConfig forwardIndexConfig , boolean hasDictionary )
995+ throws Exception {
996+ PropertiesConfiguration metadataProperties =
997+ SegmentMetadataUtils .getPropertiesConfiguration (_segmentDirectory .getSegmentMetadata ());
998+ updateForwardIndexCompressionCodecMetadata (metadataProperties , column , forwardIndexConfig , hasDictionary );
999+ SegmentMetadataUtils .savePropertiesConfiguration (metadataProperties , _segmentDirectory .getSegmentMetadata ()
1000+ .getIndexDir ());
1001+ _segmentDirectory .reloadMetadata ();
1002+ }
1003+
1004+ private static void updateForwardIndexCompressionCodecMetadata (PropertiesConfiguration metadataProperties ,
1005+ String column , @ Nullable ForwardIndexConfig forwardIndexConfig , boolean hasDictionary ) {
1006+ String metadataKey = getKeyFor (column , FORWARD_INDEX_COMPRESSION_CODEC );
1007+ if (!hasDictionary && forwardIndexConfig != null && forwardIndexConfig .getCompressionCodecSpec () != null
1008+ && forwardIndexConfig .getCompressionCodecSpec ().hasLevel ()) {
1009+ metadataProperties .setProperty (metadataKey , forwardIndexConfig .getCompressionCodecSpec ().toConfigString ());
1010+ } else {
1011+ metadataProperties .clearProperty (metadataKey );
1012+ }
1013+ }
1014+
9921015 private void rewriteDictToRawForwardIndex (ColumnMetadata columnMetadata , SegmentDirectory .Writer segmentWriter ,
9931016 File indexDir )
9941017 throws Exception {
0 commit comments