Pipe: support multiple path patterns under tree model#16435
Conversation
There was a problem hiding this comment.
Pull Request Overview
Adds foundational support for multiple path patterns in the IoTDB pipe source configuration under the tree model. Currently, the system only supports single path patterns, but this PR begins the architectural changes needed to handle multiple patterns.
- Refactors
TreePattern.parsePipePatternFromSourceParameters()to return a list of patterns instead of a single pattern - Introduces a wrapper class
DataRegionSourceWithPatternto associate sources with their specific patterns - Updates pattern matching logic to handle multiple patterns while maintaining temporary single-pattern restrictions
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| TreePattern.java | Modified parser to return List and support comma-separated paths |
| IoTDBNonDataRegionSource.java | Added validation to restrict to single pattern until full support is implemented |
| DataRegionSourceWithPattern.java | New wrapper class for associating sources with patterns |
| PipeRealtimeDataRegionSource.java | Changed internal pattern storage from single to list |
| CachedSchemaPatternMatcher.java | Updated matching logic to work with pattern lists and wrapper class |
| PipeDataRegionAssigner.java | Modified to use wrapper class for pattern-specific assignments |
| PipeHistoricalDataRegionTsFileAndDeletionSource.java | Updated to use pattern lists with TODO markers |
| IoTDBDataRegionSource.java | Updated validation to iterate over pattern list |
| DataRegionListeningFilter.java | Modified filtering logic to check any matching pattern |
| CachedSchemaPatternMatcherTest.java | Updated test to use new wrapper class API |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| for (final String singlePath : path.split(",")) { | ||
| if (!singlePath.trim().isEmpty()) { | ||
| patterns.add(new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, singlePath.trim())); | ||
| } | ||
| } |
There was a problem hiding this comment.
The splitting logic on comma assumes no escaping mechanism for commas within paths. Consider using a more robust parsing approach or documenting that commas cannot be part of path names.
| public DataRegionSourceWithPattern(final PipeRealtimeDataRegionSource source) { | ||
| this.source = source; | ||
| // TODO: handle multiple patterns | ||
| this.treePattern = source.getTreePatterns().get(0); |
There was a problem hiding this comment.
Potential IndexOutOfBoundsException if source.getTreePatterns() returns an empty list. Add bounds checking before accessing index 0.
| this.treePattern = source.getTreePatterns().get(0); | |
| if (source.getTreePatterns() != null && !source.getTreePatterns().isEmpty()) { | |
| this.treePattern = source.getTreePatterns().get(0); | |
| } else { | |
| this.treePattern = null; | |
| } |
| creationTime, | ||
| pipeTaskMeta, | ||
| treePattern, | ||
| treePatterns.get(0), // TODO |
There was a problem hiding this comment.
Potential IndexOutOfBoundsException if treePatterns is empty. Add bounds checking before accessing index 0.
| treePatterns.get(0), // TODO | |
| treePatterns.isEmpty() ? null : treePatterns.get(0), // Safe: pass null if empty |
| creationTime, | ||
| pipeTaskMeta, | ||
| treePattern, | ||
| treePatterns.get(0), // TODO |
There was a problem hiding this comment.
Potential IndexOutOfBoundsException if treePatterns is empty. Add bounds checking before accessing index 0.
| for (final PipeRealtimeDataRegionSource source : sources) { | ||
| if (!matchedSources.contains(source)) { | ||
| unmatchedSources.add(source); | ||
| // TODO: improve performance | ||
| if (!matchedSources.stream() | ||
| .map(DataRegionSourceWithPattern::getSource) | ||
| .collect(Collectors.toSet()) | ||
| .contains(source)) { |
There was a problem hiding this comment.
Converting stream to Set for contains check is inefficient. Consider using anyMatch() or maintaining a Set for faster lookups.
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| + treePattern | ||
| + "', tablePattern='" | ||
| + treePatterns | ||
| + "', tablePatterns='" |
There was a problem hiding this comment.
The field name in the string should be 'tablePattern' to match the actual field name, not 'tablePatterns'.
| + "', tablePatterns='" | |
| + "', tablePattern='" |
|
|
||
| private static List<TreePattern> parseMultiplePatterns( | ||
| final String pattern, final Function<String, TreePattern> patternSupplier) { | ||
| if (pattern.isEmpty()) { |
There was a problem hiding this comment.
This condition checks if the pattern string is empty, but an empty string should likely be handled differently than creating a pattern with an empty string. Consider checking for null or whitespace-only strings instead.
| if (pattern.isEmpty()) { | |
| if (pattern == null || pattern.trim().isEmpty()) { |
| } else { | ||
| isDbNameCoveredByPattern = treePattern.coversDb(databaseName); | ||
| isDbNameCoveredByPattern = | ||
| treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName)); |
There was a problem hiding this comment.
Using allMatch() here means the database is only considered covered if ALL patterns cover it. This seems incorrect - the database should be covered if ANY pattern covers it. Consider using anyMatch() instead.
| treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName)); | |
| treePatterns.stream().anyMatch(treePattern -> treePattern.coversDb(databaseName)); |
| } else { | ||
| isDbNameCoveredByPattern = treePattern.coversDb(databaseName); | ||
| isDbNameCoveredByPattern = | ||
| treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName)); |
There was a problem hiding this comment.
Same issue as in PipeRealtimeDataRegionSource - using allMatch() here means the database is only considered covered if ALL patterns cover it. This should likely be anyMatch() to match if any pattern covers the database.
| treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName)); | |
| treePatterns.stream().anyMatch(treePattern -> treePattern.coversDb(databaseName)); |
This reverts commit 09a79c3.
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 36 out of 36 changed files in this pull request and generated 3 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| private static List<TreePattern> parseMultiplePatterns( | ||
| final String pattern, final Function<String, TreePattern> patternSupplier) { | ||
| if (pattern.isEmpty()) { | ||
| return Collections.singletonList(patternSupplier.apply(pattern)); |
There was a problem hiding this comment.
This condition will cause empty patterns to be processed as valid patterns. An empty pattern should likely be filtered out or handled differently to avoid creating meaningless pattern matchers.
| return Collections.singletonList(patternSupplier.apply(pattern)); | |
| // Return an empty list if the pattern is empty, to avoid creating meaningless pattern matchers. | |
| return Collections.emptyList(); |
| Objects.isNull(treePatterns) | ||
| || ((treePatterns.stream() | ||
| .allMatch(treePattern -> treePattern == null || treePattern.isRoot())) |
There was a problem hiding this comment.
The nested null checks and stream operations make this logic complex. Consider extracting this into a private helper method like areAllPatternsRootOrNull() for better readability.
| } | ||
| return initEventParser().convertToTablet(); | ||
| // TODO: handle multiple patterns | ||
| return initEventParsers().get(0).convertToTablet(); |
There was a problem hiding this comment.
Using get(0) without checking if the list is empty will cause an IndexOutOfBoundsException. Add a null/empty check or handle the case where no patterns exist.
| return initEventParsers().get(0).convertToTablet(); | |
| List<TabletInsertionEventParser> parsers = initEventParsers(); | |
| if (parsers == null || parsers.isEmpty()) { | |
| // Handle the case where no parser exists; return null or throw an exception as appropriate | |
| return null; | |
| } | |
| return parsers.get(0).convertToTablet(); |
| } else { | ||
| isDbNameCoveredByPattern = treePattern.coversDb(databaseName); | ||
| isDbNameCoveredByPattern = | ||
| treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName)); |
| return parseConfigPlan( | ||
| ((PipeConfigRegionWritePlanEvent) event).getConfigPhysicalPlan(), | ||
| treePattern, | ||
| // TODO: handle multiple patterns |
| public Iterable<TabletInsertionEvent> processRowByRow( | ||
| final BiConsumer<Row, RowCollector> consumer) { | ||
| return initEventParser().processRowByRow(consumer); | ||
| return initEventParsers().stream() | ||
| .map(tabletInsertionEventParser -> tabletInsertionEventParser.processRowByRow(consumer)) | ||
| .flatMap(Collection::stream) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| @Override | ||
| public Iterable<TabletInsertionEvent> processTablet( | ||
| final BiConsumer<Tablet, RowCollector> consumer) { | ||
| return initEventParser().processTablet(consumer); | ||
| return initEventParsers().stream() | ||
| .map(tabletInsertionEventParser -> tabletInsertionEventParser.processTablet(consumer)) | ||
| .flatMap(Collection::stream) | ||
| .collect(Collectors.toList()); | ||
| } |
There was a problem hiding this comment.
Will an insertion be sent twice if I define patterns like:
root.db1.d1.s1
root.db1.d1.*
| final Iterator<TsFileInsertionEventParser> parserIterator = initEventParsers().iterator(); | ||
| return new Iterator<TabletInsertionEvent>() { | ||
| private TsFileInsertionEventParser currentParser = null; | ||
| private Iterator<TabletInsertionEvent> currentEventIterator = Collections.emptyIterator(); | ||
|
|
||
| private void closeCurrentParser() { | ||
| if (Objects.nonNull(currentParser)) { | ||
| currentParser.close(); | ||
| currentParser = null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| while (!currentEventIterator.hasNext() && parserIterator.hasNext()) { | ||
| closeCurrentParser(); | ||
| currentParser = parserIterator.next(); | ||
| currentEventIterator = currentParser.toTabletInsertionEvents().iterator(); | ||
| } | ||
|
|
||
| if (!currentEventIterator.hasNext()) { | ||
| closeCurrentParser(); | ||
| } | ||
|
|
||
| return currentEventIterator.hasNext(); | ||
| } | ||
|
|
||
| @Override | ||
| public TabletInsertionEvent next() { | ||
| if (!hasNext()) { | ||
| throw new NoSuchElementException(); | ||
| } | ||
| return currentEventIterator.next(); | ||
| } | ||
| }; | ||
| }; |
There was a problem hiding this comment.
I would suggest that you put patterns inside the parser, instead of each parser for one pattern, because:
- overlapped patterns, like root.db1.** and root.db1.d1.**, may produce redundant results;
- each pattern may result in a traverse in a TsFile, which could be inefficient.
As title.