-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39392][table] Support conditional traits for PTFs #27886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8e8700d
c0e2242
cacf1f0
aa4f50c
2c04ce1
f1fe2f2
608d255
3fd9bd3
d054743
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.table.types.inference; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.function.Predicate; | ||
|
|
||
| /** | ||
| * Internal value-comparable wrapper used by all built-in {@link TraitCondition} factories. Equality | ||
| * is keyed by {@code kind + args}; the {@code impl} predicate is reused but never compared, so two | ||
| * conditions built from the same factory inputs are equal. | ||
| * | ||
| * <p>Lives outside {@link TraitCondition} because Java forbids {@code private} nested types in | ||
| * interfaces (they are implicitly {@code public static}); top-level package-private gives the same | ||
| * encapsulation. | ||
| */ | ||
| final class BuiltInCondition implements TraitCondition { | ||
|
|
||
| /** Tag identifying which factory produced the condition. */ | ||
| enum Kind { | ||
| HAS_PARTITION_BY, | ||
| ARG_IS_EQUAL_TO, | ||
| NOT | ||
| } | ||
|
|
||
| private final Kind kind; | ||
| private final List<Object> args; | ||
| private final Predicate<TraitContext> impl; | ||
|
|
||
| BuiltInCondition(final Kind kind, final List<Object> args, final Predicate<TraitContext> impl) { | ||
| this.kind = kind; | ||
| this.args = args; | ||
| this.impl = impl; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean test(final TraitContext ctx) { | ||
| return impl.test(ctx); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(final Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (!(o instanceof BuiltInCondition)) { | ||
| return false; | ||
| } | ||
| final BuiltInCondition that = (BuiltInCondition) o; | ||
| return kind == that.kind && args.equals(that.args); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(kind, args); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,10 +31,13 @@ | |
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.EnumSet; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| /** | ||
| * Describes an argument in a static signature that is not overloaded and does not support varargs. | ||
|
|
@@ -57,18 +60,30 @@ public class StaticArgument { | |
| private final @Nullable Class<?> conversionClass; | ||
| private final boolean isOptional; | ||
| private final EnumSet<StaticArgumentTrait> traits; | ||
| private final List<ConditionalTrait> conditionalTraits; | ||
|
|
||
| private StaticArgument( | ||
| String name, | ||
| @Nullable DataType dataType, | ||
| @Nullable Class<?> conversionClass, | ||
| boolean isOptional, | ||
| EnumSet<StaticArgumentTrait> traits) { | ||
| this(name, dataType, conversionClass, isOptional, traits, List.of()); | ||
| } | ||
|
|
||
| private StaticArgument( | ||
| String name, | ||
| @Nullable DataType dataType, | ||
| @Nullable Class<?> conversionClass, | ||
| boolean isOptional, | ||
| EnumSet<StaticArgumentTrait> traits, | ||
| List<ConditionalTrait> conditionalTraits) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if a list is the most suitable data structure for the traits here. Maybe representing as a HashMap would make more sense. With a list we could still allow the user to define two conditional traits with the same StaticArgument.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've given this some though and decided we could go with OR semantics for multiple conditional traits for the same trait. This makes it possible that the user writes multiple simple condition traits and thus a list makes sense. the trait is activated if any of its conditions is met. I've documented the behavior |
||
| this.name = Preconditions.checkNotNull(name, "Name must not be null."); | ||
| this.dataType = dataType; | ||
| this.conversionClass = conversionClass; | ||
| this.isOptional = isOptional; | ||
| this.traits = Preconditions.checkNotNull(traits, "Traits must not be null."); | ||
| this.conditionalTraits = conditionalTraits; | ||
| checkName(); | ||
| checkTraits(traits); | ||
| checkOptionalType(); | ||
|
|
@@ -196,6 +211,84 @@ public boolean is(StaticArgumentTrait trait) { | |
| return traits.contains(trait); | ||
| } | ||
|
|
||
| /** | ||
| * Context-aware trait check. Evaluates conditional trait rules against the given context to | ||
| * determine the effective traits. | ||
| */ | ||
| public boolean is(StaticArgumentTrait trait, TraitContext ctx) { | ||
| return resolveTraits(ctx).contains(trait); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a new {@link StaticArgument} with an additional conditional trait rule. The trait is | ||
| * added to the effective trait set when the condition evaluates to {@code true} at planning | ||
| * time. Only non-root traits (subtraits of TABLE, SCALAR, or MODEL) are allowed. | ||
| * | ||
| * <p>Multiple conditions for the same trait use OR semantics: the trait is activated if any of | ||
| * its conditions is met. | ||
| * | ||
| * <p>Example: | ||
| * | ||
| * <pre>{@code | ||
| * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES)) | ||
| * .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy()); | ||
| * }</pre> | ||
| */ | ||
| public StaticArgument withConditionalTrait( | ||
| final StaticArgumentTrait trait, final TraitCondition condition) { | ||
| if (trait.isRoot()) { | ||
| throw new IllegalArgumentException( | ||
| "Root traits (SCALAR, TABLE, MODEL) cannot be conditional."); | ||
| } | ||
| final List<ConditionalTrait> accumulated = new ArrayList<>(this.conditionalTraits); | ||
| accumulated.add(new ConditionalTrait(condition, trait)); | ||
| return new StaticArgument(name, dataType, conversionClass, isOptional, traits, accumulated); | ||
| } | ||
|
|
||
| /** Whether this argument has conditional trait rules. */ | ||
| public boolean hasConditionalTraits() { | ||
| return !conditionalTraits.isEmpty(); | ||
| } | ||
|
|
||
| /** Whether any conditional trait rule may add the given trait. */ | ||
| public boolean hasConditionalTrait(final StaticArgumentTrait trait) { | ||
| return conditionalTraits.stream().anyMatch(c -> c.trait == trait); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a new {@link StaticArgument} with conditional traits resolved against the given | ||
| * context. The returned argument has the effective traits baked in and no conditional rules. | ||
| */ | ||
| public StaticArgument applyConditionalTraits(final TraitContext ctx) { | ||
| if (conditionalTraits.isEmpty()) { | ||
| return this; | ||
| } | ||
| return new StaticArgument(name, dataType, conversionClass, isOptional, resolveTraits(ctx)); | ||
| } | ||
|
|
||
| /** | ||
| * Resolves effective traits by evaluating conditional rules against the context. Returns the | ||
| * base traits combined with any conditional traits whose conditions are met. | ||
| */ | ||
| public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) { | ||
| if (conditionalTraits.isEmpty()) { | ||
| return traits; | ||
| } | ||
| final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits); | ||
| for (final ConditionalTrait conditionalTrait : conditionalTraits) { | ||
| if (conditionalTrait.condition.test(ctx)) { | ||
| removeMutuallyExclusiveTraits(resolved, conditionalTrait.trait); | ||
| resolved.add(conditionalTrait.trait); | ||
| } | ||
| } | ||
| return resolved; | ||
| } | ||
|
|
||
| private static void removeMutuallyExclusiveTraits( | ||
| final EnumSet<StaticArgumentTrait> traits, final StaticArgumentTrait adding) { | ||
| traits.removeAll(adding.getIncompatibleWith()); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
|
gustavodemorais marked this conversation as resolved.
|
||
| final StringBuilder s = new StringBuilder(); | ||
|
|
@@ -210,11 +303,13 @@ public String toString() { | |
| s.append(dataType); | ||
| } | ||
| if (!traits.equals(EnumSet.of(StaticArgumentTrait.SCALAR))) { | ||
| final Stream<String> baseTraitNames = | ||
| traits.stream().map(Enum::name).map(n -> n.replace('_', ' ')); | ||
| final Stream<String> conditionalTraitNames = | ||
| conditionalTraits.stream().map(c -> c.trait.name().replace('_', ' ')); | ||
| s.append(" "); | ||
| s.append( | ||
| traits.stream() | ||
| .map(Enum::name) | ||
| .map(n -> n.replace('_', ' ')) | ||
| Stream.concat(baseTraitNames, conditionalTraitNames) | ||
| .collect(Collectors.joining(", ", "{", "}"))); | ||
| } | ||
| return s.toString(); | ||
|
|
@@ -233,12 +328,13 @@ public boolean equals(Object o) { | |
| && Objects.equals(name, that.name) | ||
| && Objects.equals(dataType, that.dataType) | ||
| && Objects.equals(conversionClass, that.conversionClass) | ||
| && Objects.equals(traits, that.traits); | ||
| && Objects.equals(traits, that.traits) | ||
| && Objects.equals(conditionalTraits, that.conditionalTraits); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(name, dataType, conversionClass, isOptional, traits); | ||
| return Objects.hash(name, dataType, conversionClass, isOptional, traits, conditionalTraits); | ||
| } | ||
|
|
||
| private void checkName() { | ||
|
|
@@ -354,4 +450,32 @@ private void checkModelNotOptional() { | |
| throw new ValidationException("Model arguments must not be optional."); | ||
| } | ||
| } | ||
|
|
||
| /** A trait that is conditionally added based on a {@link TraitCondition}. */ | ||
| private static final class ConditionalTrait { | ||
| private final TraitCondition condition; | ||
| private final StaticArgumentTrait trait; | ||
|
|
||
| ConditionalTrait(final TraitCondition condition, final StaticArgumentTrait trait) { | ||
| this.condition = condition; | ||
| this.trait = trait; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(final Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| final ConditionalTrait that = (ConditionalTrait) o; | ||
| return Objects.equals(condition, that.condition) && trait == that.trait; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(condition, trait); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.