Skip to content

Commit 9bb5fed

Browse files
authored
fix issue with nested column processing/storage of empty fields (#19072)
* fix issue with nested column processing/storage of empty fields changes: * fix bug in `NestedPathFinder.toNormalizedJsonPath` creating incorrect paths when given an empty field name * fix `NestedPathFinder.parseJsonPath` to correctly detect illegal empty paths consisting of consecutive . characters * added `NestedPathFinder.parseBadJsonPath` to read nested column fields dictionaries and detect and fixup illegal path expressions, using a newly added `FieldsFixupIndexed` to swap the bad values with good values * `NestedDataColumnSupplier` on column read attempts to detect bad paths written by the bugged version of `NestedPathFinder.toNormalizedJsonPath` * added 'pathParserVersion' field to nested column part serde so that newly written nested columns after the bug fix can skip checking for the bug * fixes * better name for parameter * fix javadoc * add serde test * adjust * add better test with prebuilt segment containing the bugged field names
1 parent 010be96 commit 9bb5fed

13 files changed

Lines changed: 880 additions & 482 deletions

processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919

2020
package org.apache.druid.segment.nested;
2121

22+
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.base.Supplier;
24+
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
25+
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
2326
import org.apache.druid.collections.bitmap.ImmutableBitmap;
27+
import org.apache.druid.error.DruidException;
2428
import org.apache.druid.java.util.common.RE;
2529
import org.apache.druid.java.util.common.StringUtils;
30+
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
2631
import org.apache.druid.segment.column.ColumnBuilder;
2732
import org.apache.druid.segment.column.ColumnConfig;
2833
import org.apache.druid.segment.column.ColumnIndexSupplier;
@@ -39,16 +44,20 @@
3944
import org.apache.druid.segment.serde.ColumnSerializerUtils;
4045
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
4146

47+
import javax.annotation.Nonnull;
4248
import javax.annotation.Nullable;
4349
import java.io.IOException;
4450
import java.nio.ByteBuffer;
4551
import java.nio.ByteOrder;
52+
import java.util.Iterator;
53+
import java.util.List;
4654

4755
public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColumn>, ColumnIndexSupplier
4856
{
4957
public static NestedDataColumnSupplier read(
5058
ColumnType logicalType,
5159
boolean hasNulls,
60+
byte pathParserVersion,
5261
ByteBuffer bb,
5362
ColumnBuilder columnBuilder,
5463
ColumnConfig columnConfig,
@@ -82,7 +91,11 @@ public static NestedDataColumnSupplier read(
8291
doubleDictionarySupplier = parent.doubleDictionarySupplier;
8392
arrayDictionarySupplier = parent.arrayDictionarySupplier;
8493
} else {
85-
fieldsSupplier = StringEncodingStrategies.getStringDictionarySupplier(mapper, bb, byteOrder);
94+
if (pathParserVersion == 0x00) {
95+
fieldsSupplier = getAndFixFieldsSupplier(bb, byteOrder, mapper);
96+
} else {
97+
fieldsSupplier = StringEncodingStrategies.getStringDictionarySupplier(mapper, bb, byteOrder);
98+
}
8699
fieldInfo = FieldTypeInfo.read(bb, fieldsSupplier.get().size());
87100
final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
88101
mapper,
@@ -182,6 +195,52 @@ public static NestedDataColumnSupplier read(
182195
}
183196
}
184197

198+
199+
/**
200+
* Detects if field dictionary contains any invalid entries from a bug which previously existed in
201+
* {@link NestedPathFinder#toNormalizedJsonPath(List)} to generate invalid path expressions when faced with empty
202+
* field names - for example {"":{"a":1}} would incorrectly store the path as $..a instead of $[''].a.
203+
* <p>
204+
* If this method detects any illegal paths, the field dictionary is wrapped using {@link FieldsFixupIndexed}, which
205+
* replaces the invalid values with corrected values, using {@link NestedPathFinder#parseBadJsonPath(String)} and
206+
* feeding that back into the now fixed {@link NestedPathFinder#toNormalizedJsonPath(List)}.
207+
* <p>
208+
* Columns written after the bug was fixed will store {@link NestedCommonFormatColumnPartSerde#pathParserVersion} as
209+
* 0x01 or greater, to indicate that we do not need to call this method to check for fixing up paths.
210+
* <p>
211+
* see https://github.com/apache/druid/pull/19072 for additional details.
212+
*/
213+
@VisibleForTesting
214+
static Supplier<? extends Indexed<ByteBuffer>> getAndFixFieldsSupplier(
215+
ByteBuffer bb,
216+
ByteOrder byteOrder,
217+
SegmentFileMapper mapper
218+
)
219+
{
220+
final Supplier<? extends Indexed<ByteBuffer>> fieldsSupplier;
221+
Supplier<? extends Indexed<ByteBuffer>> _fieldsSupplier =
222+
StringEncodingStrategies.getStringDictionarySupplier(mapper, bb, byteOrder);
223+
// check for existence of bug to detect if we need a fixup adapter or not
224+
Indexed<ByteBuffer> fields = _fieldsSupplier.get();
225+
Int2ObjectMap<ByteBuffer> fixupMap = new Int2ObjectOpenHashMap<>();
226+
for (int i = 0; i < fields.size(); i++) {
227+
String path = StringUtils.fromUtf8Nullable(fields.get(i));
228+
try {
229+
NestedPathFinder.parseJsonPath(path);
230+
}
231+
catch (DruidException d) {
232+
String fixed = NestedPathFinder.toNormalizedJsonPath(NestedPathFinder.parseBadJsonPath(path));
233+
fixupMap.put(i, StringUtils.toUtf8ByteBuffer(fixed));
234+
}
235+
}
236+
if (fixupMap.isEmpty()) {
237+
fieldsSupplier = _fieldsSupplier;
238+
} else {
239+
fieldsSupplier = () -> new FieldsFixupIndexed(_fieldsSupplier.get(), fixupMap);
240+
}
241+
return fieldsSupplier;
242+
}
243+
185244
private final String columnName;
186245
private final Supplier<? extends Indexed<ByteBuffer>> fieldSupplier;
187246
private final FieldTypeInfo fieldInfo;
@@ -268,4 +327,94 @@ public <T> T as(Class<T> clazz)
268327
}
269328
return null;
270329
}
330+
331+
/**
332+
* {@link Indexed} implementation which contains a map of positions to replace with corrected values by
333+
* {@link #getAndFixFieldsSupplier(ByteBuffer, ByteOrder, SegmentFileMapper)}.
334+
* <p>
335+
* This implementation is no longer {@link #isSorted()}, despite the underlying {@link Indexed} being so, as the
336+
* replaced values might violate the old sort order, so this cannot provide that guarantee. Despite this,
337+
* {@link #indexOf(ByteBuffer)} will still function for finding if items exist in the dictionary, the replaced values
338+
* are searched prior to calling {@link #indexOf(ByteBuffer)} on the underlying dictionary, so for the values we are
339+
* trying to find will always report their correct positions.
340+
* It is important that the fixed dictionary retains its original order because the positions are used as the internal
341+
* field file names (instead of the paths themselves) in the segment file. Callers however should not expect
342+
* iterating the dictionary to provide values in sorted order.
343+
*/
344+
@VisibleForTesting
345+
public static class FieldsFixupIndexed implements Indexed<ByteBuffer>
346+
{
347+
private final Indexed<ByteBuffer> delegate;
348+
private final Int2ObjectMap<ByteBuffer> fixup;
349+
350+
private FieldsFixupIndexed(Indexed<ByteBuffer> delegate, Int2ObjectMap<ByteBuffer> fixup)
351+
{
352+
this.delegate = delegate;
353+
this.fixup = fixup;
354+
}
355+
356+
@Override
357+
public int size()
358+
{
359+
return delegate.size();
360+
}
361+
362+
@Nullable
363+
@Override
364+
public ByteBuffer get(int index)
365+
{
366+
if (fixup.containsKey(index)) {
367+
return fixup.get(index).asReadOnlyBuffer();
368+
}
369+
return delegate.get(index);
370+
}
371+
372+
@Override
373+
public int indexOf(@Nullable ByteBuffer value)
374+
{
375+
for (Int2ObjectMap.Entry<ByteBuffer> entry : fixup.int2ObjectEntrySet()) {
376+
if (entry.getValue().equals(value)) {
377+
return entry.getIntKey();
378+
}
379+
}
380+
return delegate.indexOf(value);
381+
}
382+
383+
@Nonnull
384+
@Override
385+
public Iterator<ByteBuffer> iterator()
386+
{
387+
return new Iterator<>()
388+
{
389+
int pos = 0;
390+
final int size = delegate.size();
391+
final Iterator<ByteBuffer> delegateIterator = delegate.iterator();
392+
393+
@Override
394+
public boolean hasNext()
395+
{
396+
return pos < size;
397+
}
398+
399+
@Override
400+
public ByteBuffer next()
401+
{
402+
if (fixup.containsKey(pos)) {
403+
// move delegate iterator forward, but we're going to return our own value
404+
delegateIterator.next();
405+
// this is sad, but downstream stuff wants ByteBuffer, and is less sad than the original bug
406+
return fixup.get(pos++).asReadOnlyBuffer();
407+
}
408+
pos++;
409+
return delegateIterator.next();
410+
}
411+
};
412+
}
413+
414+
@Override
415+
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
416+
{
417+
418+
}
419+
}
271420
}

processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public static NestedDataColumnSupplierV4 read(
8080
try {
8181
final SegmentFileMapper mapper = columnBuilder.getFileMapper();
8282
final ComplexColumnMetadata metadata;
83-
final GenericIndexed<ByteBuffer> fields;
83+
final Supplier<? extends Indexed<ByteBuffer>> fields;
8484
final FieldTypeInfo fieldInfo;
8585
final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier;
8686
final ImmutableBitmap nullValues;
@@ -95,17 +95,23 @@ public static NestedDataColumnSupplierV4 read(
9595
IndexMerger.SERIALIZER_UTILS.readString(bb),
9696
ComplexColumnMetadata.class
9797
);
98-
fields = GenericIndexed.read(bb, GenericIndexed.UTF8_STRATEGY, mapper);
99-
fieldInfo = FieldTypeInfo.read(bb, fields.size());
98+
if (version < 0x04) {
99+
// older than v4 uses jq paths
100+
fields = GenericIndexed.read(bb, GenericIndexed.UTF8_STRATEGY, mapper)::singleThreaded;
101+
} else {
102+
fields = NestedDataColumnSupplier.getAndFixFieldsSupplier(bb, metadata.getByteOrder(), mapper);
103+
}
104+
Indexed<ByteBuffer> fieldsDict = fields.get();
105+
fieldInfo = FieldTypeInfo.read(bb, fieldsDict.size());
100106

101-
if (fields.size() == 0) {
107+
if (fieldsDict.size() == 0) {
102108
// all nulls, in the future we'll deal with this better... but for now lets just call it a string because
103109
// it is the most permissive (besides json)
104110
simpleType = ColumnType.STRING;
105-
} else if (fields.size() == 1 &&
106-
((version == 0x03 && NestedPathFinder.JQ_PATH_ROOT.equals(StringUtils.fromUtf8(fields.get(0)))) ||
111+
} else if (fieldsDict.size() == 1 &&
112+
((version == 0x03 && NestedPathFinder.JQ_PATH_ROOT.equals(StringUtils.fromUtf8(fieldsDict.get(0)))) ||
107113
((version == 0x04 || version == 0x05)
108-
&& NestedPathFinder.JSON_PATH_ROOT.equals(StringUtils.fromUtf8(fields.get(0)))))
114+
&& NestedPathFinder.JSON_PATH_ROOT.equals(StringUtils.fromUtf8(fieldsDict.get(0)))))
109115
) {
110116
simpleType = fieldInfo.getTypes(0).getSingleType();
111117
} else {
@@ -213,7 +219,7 @@ public static NestedDataColumnSupplierV4 read(
213219
private final byte version;
214220
private final String columnName;
215221
private final ColumnConfig columnConfig;
216-
private final GenericIndexed<ByteBuffer> fields;
222+
private final Supplier<? extends Indexed<ByteBuffer>> fieldsSupplier;
217223
private final FieldTypeInfo fieldInfo;
218224
private final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier;
219225
private final ImmutableBitmap nullValues;
@@ -233,7 +239,7 @@ private NestedDataColumnSupplierV4(
233239
byte version,
234240
String columnName,
235241
ColumnConfig columnConfig,
236-
GenericIndexed<ByteBuffer> fields,
242+
Supplier<? extends Indexed<ByteBuffer>> fieldsSupplier,
237243
FieldTypeInfo fieldInfo,
238244
CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier,
239245
ImmutableBitmap nullValues,
@@ -250,7 +256,7 @@ private NestedDataColumnSupplierV4(
250256
this.version = version;
251257
this.columnName = columnName;
252258
this.columnConfig = columnConfig;
253-
this.fields = fields;
259+
this.fieldsSupplier = fieldsSupplier;
254260
this.fieldInfo = fieldInfo;
255261
this.compressedRawColumnSupplier = compressedRawColumnSupplier;
256262
this.nullValues = nullValues;
@@ -291,7 +297,7 @@ private NestedDataColumnV3 makeV3()
291297
columnConfig,
292298
compressedRawColumnSupplier,
293299
nullValues,
294-
fields,
300+
fieldsSupplier,
295301
fieldInfo,
296302
stringDictionarySupplier,
297303
longDictionarySupplier,
@@ -310,7 +316,7 @@ private NestedDataColumnV4 makeV4()
310316
columnConfig,
311317
compressedRawColumnSupplier,
312318
nullValues,
313-
fields,
319+
fieldsSupplier,
314320
fieldInfo,
315321
stringDictionarySupplier,
316322
longDictionarySupplier,
@@ -329,7 +335,7 @@ private NestedDataColumnV5 makeV5()
329335
columnConfig,
330336
compressedRawColumnSupplier,
331337
nullValues,
332-
fields::singleThreaded,
338+
fieldsSupplier,
333339
fieldInfo,
334340
stringDictionarySupplier,
335341
longDictionarySupplier,

processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV3.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,23 @@
2626
import org.apache.druid.segment.column.ColumnType;
2727
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
2828
import org.apache.druid.segment.data.FixedIndexed;
29-
import org.apache.druid.segment.data.GenericIndexed;
3029
import org.apache.druid.segment.data.Indexed;
3130
import org.apache.druid.segment.file.SegmentFileMapper;
3231

3332
import java.nio.ByteBuffer;
3433
import java.nio.ByteOrder;
3534
import java.util.List;
3635

37-
public final class NestedDataColumnV3<TStringDictionary extends Indexed<ByteBuffer>>
38-
extends CompressedNestedDataComplexColumn<Indexed<ByteBuffer>, TStringDictionary>
36+
public final class NestedDataColumnV3<TKeyDictionary extends Indexed<ByteBuffer>, TStringDictionary extends Indexed<ByteBuffer>>
37+
extends CompressedNestedDataComplexColumn<TKeyDictionary, TStringDictionary>
3938
{
4039
public NestedDataColumnV3(
4140
String columnName,
4241
ColumnType logicalType,
4342
ColumnConfig columnConfig,
4443
CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier,
4544
ImmutableBitmap nullValues,
46-
GenericIndexed<ByteBuffer> fields,
45+
Supplier<TKeyDictionary> fieldsSupplier,
4746
FieldTypeInfo fieldInfo,
4847
Supplier<TStringDictionary> stringDictionary,
4948
Supplier<FixedIndexed<Long>> longDictionarySupplier,
@@ -59,7 +58,7 @@ public NestedDataColumnV3(
5958
columnConfig,
6059
compressedRawColumnSupplier,
6160
nullValues,
62-
fields::singleThreaded,
61+
fieldsSupplier,
6362
fieldInfo,
6463
stringDictionary,
6564
longDictionarySupplier,

processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV4.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.druid.segment.column.ColumnType;
2626
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
2727
import org.apache.druid.segment.data.FixedIndexed;
28-
import org.apache.druid.segment.data.GenericIndexed;
2928
import org.apache.druid.segment.data.Indexed;
3029
import org.apache.druid.segment.file.SegmentFileMapper;
3130
import org.apache.druid.segment.serde.ColumnSerializerUtils;
@@ -34,16 +33,16 @@
3433
import java.nio.ByteOrder;
3534
import java.util.List;
3635

37-
public final class NestedDataColumnV4<TStringDictionary extends Indexed<ByteBuffer>>
38-
extends CompressedNestedDataComplexColumn<Indexed<ByteBuffer>, TStringDictionary>
36+
public final class NestedDataColumnV4<TKeyDictionary extends Indexed<ByteBuffer>, TStringDictionary extends Indexed<ByteBuffer>>
37+
extends CompressedNestedDataComplexColumn<TKeyDictionary, TStringDictionary>
3938
{
4039
public NestedDataColumnV4(
4140
String columnName,
4241
ColumnType logicalType,
4342
ColumnConfig columnConfig,
4443
CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier,
4544
ImmutableBitmap nullValues,
46-
GenericIndexed<ByteBuffer> fields,
45+
Supplier<TKeyDictionary> fieldsSupplier,
4746
FieldTypeInfo fieldInfo,
4847
Supplier<TStringDictionary> stringDictionary,
4948
Supplier<FixedIndexed<Long>> longDictionarySupplier,
@@ -59,7 +58,7 @@ public NestedDataColumnV4(
5958
columnConfig,
6059
compressedRawColumnSupplier,
6160
nullValues,
62-
fields::singleThreaded,
61+
fieldsSupplier,
6362
fieldInfo,
6463
stringDictionary,
6564
longDictionarySupplier,

0 commit comments

Comments
 (0)