Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,18 @@ This is useful when you need to materialize changelog events into a downstream s

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE source_table,
input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
)
```

### Parameters

| Parameter | Required | Description |
|:-------------|:---------|:------------|
| `input` | Yes | The input table. Accepts insert-only, retract, and upsert tables. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
| Parameter | Required | Description |
|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. |

#### Default op_mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TraitCondition;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
Expand Down Expand Up @@ -785,22 +786,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.name("TO_CHANGELOG")
.kind(PROCESS_TABLE)
.staticArguments(
// Row semantics (no PARTITION BY). Accepts updating
// inputs. The planner inserts ChangelogNormalize for
// upsert sources to produce UPDATE_BEFORE and full
// DELETE rows.
// Row semantics (no PARTITION BY).
// With PARTITION BY, switches to set
// semantics for co-located parallel execution.
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
StaticArgumentTrait.SUPPORT_UPDATES,
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
// Not strictly necessary but explicitly state that
// we require full deletes.
StaticArgumentTrait.REQUIRE_FULL_DELETE)),
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
Comment thread
gustavodemorais marked this conversation as resolved.
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
StaticArgumentTrait.SUPPORT_UPDATES,
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
StaticArgumentTrait.REQUIRE_FULL_DELETE))
.withConditionalTrait(
StaticArgumentTrait.SET_SEMANTIC_TABLE,
TraitCondition.hasPartitionBy()),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference;

import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

/**
* Internal value-comparable wrapper used by all built-in {@link TraitCondition} factories. Equality
* is keyed by {@code kind + args}; the {@code impl} predicate is reused but never compared, so two
* conditions built from the same factory inputs are equal.
*
* <p>Lives outside {@link TraitCondition} because Java forbids {@code private} nested types in
* interfaces (they are implicitly {@code public static}); top-level package-private gives the same
* encapsulation.
*/
final class BuiltInCondition implements TraitCondition {

/** Tag identifying which factory produced the condition. */
enum Kind {
HAS_PARTITION_BY,
ARG_IS_EQUAL_TO,
NOT
}

private final Kind kind;
private final List<Object> args;
private final Predicate<TraitContext> impl;

BuiltInCondition(final Kind kind, final List<Object> args, final Predicate<TraitContext> impl) {
this.kind = kind;
this.args = args;
this.impl = impl;
}

@Override
public boolean test(final TraitContext ctx) {
return impl.test(ctx);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof BuiltInCondition)) {
return false;
}
final BuiltInCondition that = (BuiltInCondition) o;
return kind == that.kind && args.equals(that.args);
}

@Override
public int hashCode() {
return Objects.hash(kind, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Describes an argument in a static signature that is not overloaded and does not support varargs.
Expand All @@ -57,18 +60,30 @@ public class StaticArgument {
private final @Nullable Class<?> conversionClass;
private final boolean isOptional;
private final EnumSet<StaticArgumentTrait> traits;
private final List<ConditionalTrait> conditionalTraits;

private StaticArgument(
String name,
@Nullable DataType dataType,
@Nullable Class<?> conversionClass,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits) {
this(name, dataType, conversionClass, isOptional, traits, List.of());
}

private StaticArgument(
String name,
@Nullable DataType dataType,
@Nullable Class<?> conversionClass,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits,
List<ConditionalTrait> conditionalTraits) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if a list is the most suitable data structure for the traits here. Maybe representing as a HashMap would make more sense. With a list we could still allow the user to define two conditional traits with the same StaticArgument.
[StaticArgumentTrait.SET_SEMANTIC_TABLE, hasPartitionBy, StaticArgumentTrait.SET_SEMANTIC_TABLE, hasSomeCondition]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've given this some though and decided we could go with OR semantics for multiple conditional traits for the same trait. This makes it possible that the user writes multiple simple condition traits and thus a list makes sense. the trait is activated if any of its conditions is met. I've documented the behavior

this.name = Preconditions.checkNotNull(name, "Name must not be null.");
this.dataType = dataType;
this.conversionClass = conversionClass;
this.isOptional = isOptional;
this.traits = Preconditions.checkNotNull(traits, "Traits must not be null.");
this.conditionalTraits = conditionalTraits;
checkName();
checkTraits(traits);
checkOptionalType();
Expand Down Expand Up @@ -196,6 +211,84 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}

/**
* Context-aware trait check. Evaluates conditional trait rules against the given context to
* determine the effective traits.
*/
public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
return resolveTraits(ctx).contains(trait);
}

/**
* Returns a new {@link StaticArgument} with an additional conditional trait rule. The trait is
* added to the effective trait set when the condition evaluates to {@code true} at planning
* time. Only non-root traits (subtraits of TABLE, SCALAR, or MODEL) are allowed.
*
* <p>Multiple conditions for the same trait use OR semantics: the trait is activated if any of
* its conditions is met.
*
* <p>Example:
*
* <pre>{@code
* StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES))
* .withConditionalTrait(SET_SEMANTIC_TABLE, hasPartitionBy());
* }</pre>
*/
public StaticArgument withConditionalTrait(
final StaticArgumentTrait trait, final TraitCondition condition) {
if (trait.isRoot()) {
throw new IllegalArgumentException(
"Root traits (SCALAR, TABLE, MODEL) cannot be conditional.");
}
final List<ConditionalTrait> accumulated = new ArrayList<>(this.conditionalTraits);
accumulated.add(new ConditionalTrait(condition, trait));
return new StaticArgument(name, dataType, conversionClass, isOptional, traits, accumulated);
}

/** Whether this argument has conditional trait rules. */
public boolean hasConditionalTraits() {
return !conditionalTraits.isEmpty();
}

/** Whether any conditional trait rule may add the given trait. */
public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
return conditionalTraits.stream().anyMatch(c -> c.trait == trait);
}

/**
* Returns a new {@link StaticArgument} with conditional traits resolved against the given
* context. The returned argument has the effective traits baked in and no conditional rules.
*/
public StaticArgument applyConditionalTraits(final TraitContext ctx) {
if (conditionalTraits.isEmpty()) {
return this;
}
return new StaticArgument(name, dataType, conversionClass, isOptional, resolveTraits(ctx));
}

/**
* Resolves effective traits by evaluating conditional rules against the context. Returns the
* base traits combined with any conditional traits whose conditions are met.
*/
public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
if (conditionalTraits.isEmpty()) {
return traits;
}
final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
for (final ConditionalTrait conditionalTrait : conditionalTraits) {
if (conditionalTrait.condition.test(ctx)) {
removeMutuallyExclusiveTraits(resolved, conditionalTrait.trait);
resolved.add(conditionalTrait.trait);
}
}
return resolved;
}

private static void removeMutuallyExclusiveTraits(
final EnumSet<StaticArgumentTrait> traits, final StaticArgumentTrait adding) {
traits.removeAll(adding.getIncompatibleWith());
}

@Override
public String toString() {
Comment thread
gustavodemorais marked this conversation as resolved.
final StringBuilder s = new StringBuilder();
Expand All @@ -210,11 +303,13 @@ public String toString() {
s.append(dataType);
}
if (!traits.equals(EnumSet.of(StaticArgumentTrait.SCALAR))) {
final Stream<String> baseTraitNames =
traits.stream().map(Enum::name).map(n -> n.replace('_', ' '));
final Stream<String> conditionalTraitNames =
conditionalTraits.stream().map(c -> c.trait.name().replace('_', ' '));
s.append(" ");
s.append(
traits.stream()
.map(Enum::name)
.map(n -> n.replace('_', ' '))
Stream.concat(baseTraitNames, conditionalTraitNames)
.collect(Collectors.joining(", ", "{", "}")));
}
return s.toString();
Expand All @@ -233,12 +328,13 @@ public boolean equals(Object o) {
&& Objects.equals(name, that.name)
&& Objects.equals(dataType, that.dataType)
&& Objects.equals(conversionClass, that.conversionClass)
&& Objects.equals(traits, that.traits);
&& Objects.equals(traits, that.traits)
&& Objects.equals(conditionalTraits, that.conditionalTraits);
}

@Override
public int hashCode() {
return Objects.hash(name, dataType, conversionClass, isOptional, traits);
return Objects.hash(name, dataType, conversionClass, isOptional, traits, conditionalTraits);
}

private void checkName() {
Expand Down Expand Up @@ -354,4 +450,32 @@ private void checkModelNotOptional() {
throw new ValidationException("Model arguments must not be optional.");
}
}

/** A trait that is conditionally added based on a {@link TraitCondition}. */
private static final class ConditionalTrait {
private final TraitCondition condition;
private final StaticArgumentTrait trait;

ConditionalTrait(final TraitCondition condition, final StaticArgumentTrait trait) {
this.condition = condition;
this.trait = trait;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ConditionalTrait that = (ConditionalTrait) o;
return Objects.equals(condition, that.condition) && trait == that.trait;
}

@Override
public int hashCode() {
return Objects.hash(condition, trait);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.annotation.PublicEvolving;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -51,6 +53,8 @@ public enum StaticArgumentTrait {
REQUIRE_UPDATE_BEFORE(SUPPORT_UPDATES),
REQUIRE_FULL_DELETE(SUPPORT_UPDATES);

private static final Set<StaticArgumentTrait> ROOTS = EnumSet.of(SCALAR, TABLE, MODEL);

private final Set<StaticArgumentTrait> requirements;

StaticArgumentTrait(StaticArgumentTrait... requirements) {
Expand All @@ -60,4 +64,24 @@ public enum StaticArgumentTrait {
public Set<StaticArgumentTrait> getRequirements() {
return requirements;
}

/** Whether this trait is one of the top-level roots (SCALAR, TABLE, MODEL). */
public boolean isRoot() {
return ROOTS.contains(this);
}

/**
* Returns the traits that are mutually exclusive with this one. Adding this trait to a set
* implies removing all returned traits. Empty by default.
*/
public Set<StaticArgumentTrait> getIncompatibleWith() {
switch (this) {
case SET_SEMANTIC_TABLE:
return Collections.singleton(ROW_SEMANTIC_TABLE);
case ROW_SEMANTIC_TABLE:
return Collections.singleton(SET_SEMANTIC_TABLE);
default:
return Collections.emptySet();
}
}
}
Loading