Skip to content

Pipe: support multiple path patterns under tree model#16435

Closed
VGalaxies wants to merge 24 commits into
apache:masterfrom
VGalaxies:multi-path
Closed

Pipe: support multiple path patterns under tree model#16435
VGalaxies wants to merge 24 commits into
apache:masterfrom
VGalaxies:multi-path

Conversation

@VGalaxies

@VGalaxies VGalaxies commented Sep 18, 2025

Copy link
Copy Markdown
Contributor

As title.

@VGalaxies VGalaxies requested a review from Copilot September 18, 2025 07:05

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds foundational support for multiple path patterns in the IoTDB pipe source configuration under the tree model. Currently, the system only supports single path patterns, but this PR begins the architectural changes needed to handle multiple patterns.

  • Refactors TreePattern.parsePipePatternFromSourceParameters() to return a list of patterns instead of a single pattern
  • Introduces a wrapper class DataRegionSourceWithPattern to associate sources with their specific patterns
  • Updates pattern matching logic to handle multiple patterns while maintaining temporary single-pattern restrictions

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
TreePattern.java Modified parser to return List and support comma-separated paths
IoTDBNonDataRegionSource.java Added validation to restrict to single pattern until full support is implemented
DataRegionSourceWithPattern.java New wrapper class for associating sources with patterns
PipeRealtimeDataRegionSource.java Changed internal pattern storage from single to list
CachedSchemaPatternMatcher.java Updated matching logic to work with pattern lists and wrapper class
PipeDataRegionAssigner.java Modified to use wrapper class for pattern-specific assignments
PipeHistoricalDataRegionTsFileAndDeletionSource.java Updated to use pattern lists with TODO markers
IoTDBDataRegionSource.java Updated validation to iterate over pattern list
DataRegionListeningFilter.java Modified filtering logic to check any matching pattern
CachedSchemaPatternMatcherTest.java Updated test to use new wrapper class API

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +85 to +89
for (final String singlePath : path.split(",")) {
if (!singlePath.trim().isEmpty()) {
patterns.add(new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, singlePath.trim()));
}
}

Copilot AI Sep 18, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The splitting logic on comma assumes no escaping mechanism for commas within paths. Consider using a more robust parsing approach or documenting that commas cannot be part of path names.

Copilot uses AI. Check for mistakes.
public DataRegionSourceWithPattern(final PipeRealtimeDataRegionSource source) {
this.source = source;
// TODO: handle multiple patterns
this.treePattern = source.getTreePatterns().get(0);

Copilot AI Sep 18, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential IndexOutOfBoundsException if source.getTreePatterns() returns an empty list. Add bounds checking before accessing index 0.

Suggested change
this.treePattern = source.getTreePatterns().get(0);
if (source.getTreePatterns() != null && !source.getTreePatterns().isEmpty()) {
this.treePattern = source.getTreePatterns().get(0);
} else {
this.treePattern = null;
}

Copilot uses AI. Check for mistakes.
creationTime,
pipeTaskMeta,
treePattern,
treePatterns.get(0), // TODO

Copilot AI Sep 18, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential IndexOutOfBoundsException if treePatterns is empty. Add bounds checking before accessing index 0.

Suggested change
treePatterns.get(0), // TODO
treePatterns.isEmpty() ? null : treePatterns.get(0), // Safe: pass null if empty

Copilot uses AI. Check for mistakes.
creationTime,
pipeTaskMeta,
treePattern,
treePatterns.get(0), // TODO

Copilot AI Sep 18, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential IndexOutOfBoundsException if treePatterns is empty. Add bounds checking before accessing index 0.

Copilot uses AI. Check for mistakes.
Comment on lines +191 to +200
for (final PipeRealtimeDataRegionSource source : sources) {
if (!matchedSources.contains(source)) {
unmatchedSources.add(source);
// TODO: improve performance
if (!matchedSources.stream()
.map(DataRegionSourceWithPattern::getSource)
.collect(Collectors.toSet())
.contains(source)) {

Copilot AI Sep 18, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting stream to Set for contains check is inefficient. Consider using anyMatch() or maintaining a Set for faster lookups.

Copilot uses AI. Check for mistakes.
@VGalaxies VGalaxies marked this pull request as ready for review September 25, 2025 08:15

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

+ treePattern
+ "', tablePattern='"
+ treePatterns
+ "', tablePatterns='"

Copilot AI Sep 25, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field name in the string should be 'tablePattern' to match the actual field name, not 'tablePatterns'.

Suggested change
+ "', tablePatterns='"
+ "', tablePattern='"

Copilot uses AI. Check for mistakes.

private static List<TreePattern> parseMultiplePatterns(
final String pattern, final Function<String, TreePattern> patternSupplier) {
if (pattern.isEmpty()) {

Copilot AI Sep 25, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition checks if the pattern string is empty, but an empty string should likely be handled differently than creating a pattern with an empty string. Consider checking for null or whitespace-only strings instead.

Suggested change
if (pattern.isEmpty()) {
if (pattern == null || pattern.trim().isEmpty()) {

Copilot uses AI. Check for mistakes.
} else {
isDbNameCoveredByPattern = treePattern.coversDb(databaseName);
isDbNameCoveredByPattern =
treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName));

Copilot AI Sep 25, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using allMatch() here means the database is only considered covered if ALL patterns cover it. This seems incorrect - the database should be covered if ANY pattern covers it. Consider using anyMatch() instead.

Suggested change
treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName));
treePatterns.stream().anyMatch(treePattern -> treePattern.coversDb(databaseName));

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

} else {
isDbNameCoveredByPattern = treePattern.coversDb(databaseName);
isDbNameCoveredByPattern =
treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName));

Copilot AI Sep 25, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as in PipeRealtimeDataRegionSource - using allMatch() here means the database is only considered covered if ALL patterns cover it. This should likely be anyMatch() to match if any pattern covers the database.

Suggested change
treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName));
treePatterns.stream().anyMatch(treePattern -> treePattern.coversDb(databaseName));

Copilot uses AI. Check for mistakes.
@VGalaxies VGalaxies requested a review from Copilot September 25, 2025 08:23

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 36 out of 36 changed files in this pull request and generated 3 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

private static List<TreePattern> parseMultiplePatterns(
final String pattern, final Function<String, TreePattern> patternSupplier) {
if (pattern.isEmpty()) {
return Collections.singletonList(patternSupplier.apply(pattern));

Copilot AI Sep 25, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition will cause empty patterns to be processed as valid patterns. An empty pattern should likely be filtered out or handled differently to avoid creating meaningless pattern matchers.

Suggested change
return Collections.singletonList(patternSupplier.apply(pattern));
// Return an empty list if the pattern is empty, to avoid creating meaningless pattern matchers.
return Collections.emptyList();

Copilot uses AI. Check for mistakes.
Comment on lines +114 to +116
Objects.isNull(treePatterns)
|| ((treePatterns.stream()
.allMatch(treePattern -> treePattern == null || treePattern.isRoot()))

Copilot AI Sep 25, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested null checks and stream operations make this logic complex. Consider extracting this into a private helper method like areAllPatternsRootOrNull() for better readability.

Copilot uses AI. Check for mistakes.
}
return initEventParser().convertToTablet();
// TODO: handle multiple patterns
return initEventParsers().get(0).convertToTablet();

Copilot AI Sep 25, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using get(0) without checking if the list is empty will cause an IndexOutOfBoundsException. Add a null/empty check or handle the case where no patterns exist.

Suggested change
return initEventParsers().get(0).convertToTablet();
List<TabletInsertionEventParser> parsers = initEventParsers();
if (parsers == null || parsers.isEmpty()) {
// Handle the case where no parser exists; return null or throw an exception as appropriate
return null;
}
return parsers.get(0).convertToTablet();

Copilot uses AI. Check for mistakes.
} else {
isDbNameCoveredByPattern = treePattern.coversDb(databaseName);
isDbNameCoveredByPattern =
treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName));

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return parseConfigPlan(
((PipeConfigRegionWritePlanEvent) event).getConfigPhysicalPlan(),
treePattern,
// TODO: handle multiple patterns

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should complete it

Comment on lines 414 to 429
public Iterable<TabletInsertionEvent> processRowByRow(
final BiConsumer<Row, RowCollector> consumer) {
return initEventParser().processRowByRow(consumer);
return initEventParsers().stream()
.map(tabletInsertionEventParser -> tabletInsertionEventParser.processRowByRow(consumer))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

@Override
public Iterable<TabletInsertionEvent> processTablet(
final BiConsumer<Tablet, RowCollector> consumer) {
return initEventParser().processTablet(consumer);
return initEventParsers().stream()
.map(tabletInsertionEventParser -> tabletInsertionEventParser.processTablet(consumer))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will an insertion be sent twice if I define patterns like:
root.db1.d1.s1
root.db1.d1.*

Comment on lines +621 to +656
final Iterator<TsFileInsertionEventParser> parserIterator = initEventParsers().iterator();
return new Iterator<TabletInsertionEvent>() {
private TsFileInsertionEventParser currentParser = null;
private Iterator<TabletInsertionEvent> currentEventIterator = Collections.emptyIterator();

private void closeCurrentParser() {
if (Objects.nonNull(currentParser)) {
currentParser.close();
currentParser = null;
}
}

@Override
public boolean hasNext() {
while (!currentEventIterator.hasNext() && parserIterator.hasNext()) {
closeCurrentParser();
currentParser = parserIterator.next();
currentEventIterator = currentParser.toTabletInsertionEvents().iterator();
}

if (!currentEventIterator.hasNext()) {
closeCurrentParser();
}

return currentEventIterator.hasNext();
}

@Override
public TabletInsertionEvent next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentEventIterator.next();
}
};
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that you put patterns inside the parser, instead of each parser for one pattern, because:

  1. overlapped patterns, like root.db1.** and root.db1.d1.**, may produce redundant results;
  2. each pattern may result in a traverse in a TsFile, which could be inefficient.

@VGalaxies VGalaxies closed this Oct 13, 2025
@VGalaxies VGalaxies deleted the multi-path branch October 21, 2025 02:41
@VGalaxies VGalaxies restored the multi-path branch October 21, 2025 02:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants