Skip to content

[FLINK-39401] Extend raw format to support line-delimiter option#27897

Merged
RocMarshal merged 4 commits intoapache:masterfrom
featzhang:feature/raw-format-line-delimiter
Apr 17, 2026
Merged

[FLINK-39401] Extend raw format to support line-delimiter option#27897
RocMarshal merged 4 commits intoapache:masterfrom
featzhang:feature/raw-format-line-delimiter

Conversation

@featzhang
Copy link
Copy Markdown
Member

Summary

This PR extends the raw format to support a new optional raw.line-delimiter config option.

When raw.line-delimiter is set:

  • Deserialization: each incoming message is decoded to a string using raw.charset, split by the delimiter (String.split(Pattern.quote(delimiter), -1)), and one RowData is emitted per segment via deserialize(byte[], Collector<T>).
  • Serialization: the delimiter bytes are appended to the serialized value.

When raw.line-delimiter is not set, all existing behavior is preserved exactly (backward compatible).

Example SQL

CREATE TABLE nginx_log (log STRING) WITH (
  'connector'          = 'kafka',
  'topic'              = 'nginx_log',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'             = 'raw',
  'raw.line-delimiter' = '\n'
);
-- Each Kafka message "line1\nline2\nline3" produces 3 rows
SELECT log FROM nginx_log;

Changes

Class Change
RawFormatOptions Add LINE_DELIMITER ConfigOption<String> with no default value
RawFormatFactory Read new option; pass to schema builders; register in optionalOptions()
RawFormatDeserializationSchema Override deserialize(byte[], Collector) to split by delimiter when set; add lineDelimiter field to equals/hashCode
RawFormatSerializationSchema Append delimiter bytes when set; add lineDelimiter field to equals/hashCode
RawFormatFactoryTest Add testLineDelimiterOption()
RawFormatLineDelimiterTest New test class: 9 tests covering splitting, GBK charset, null message, serialization appending, null row

Test Plan

  • RawFormatLineDelimiterTest (9 tests):
    • No delimiter → single row (regression)
    • \n delimiter → 3 rows
    • Custom multi-char delimiter || → 3 rows
    • Null message with delimiter → 0 rows
    • GBK charset with \n delimiter → correct splitting
    • Serialization without delimiter → no change
    • Serialization with \n → appends \n
    • Serialization with || → appends ||
    • Serialization of null row → returns null
  • RawFormatFactoryTest.testLineDelimiterOption(): verifies factory produces schemas with correct delimiter
  • All 50 existing raw format tests continue to pass

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@featzhang featzhang changed the title [FLINK] Extend raw format to support line-delimiter option [FLINK-39401] Extend raw format to support line-delimiter option Apr 5, 2026
@rmetzger
Copy link
Copy Markdown
Contributor

rmetzger commented Apr 8, 2026

@featzhang Thanks for this contribution. The commit message in the PR should include the Jira ticket id.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 8, 2026
@featzhang featzhang force-pushed the feature/raw-format-line-delimiter branch from 8205f0c to 3161013 Compare April 11, 2026 16:27
@featzhang
Copy link
Copy Markdown
Member Author

Thanks for the thorough review @spuru9 and @rmetzger!

Here's what was addressed in this update:

  1. Commit message: Updated to include the Jira ticket ID: [FLINK-39401][formats].

  2. Round-trip compatibility fix (serializer/deserializer incompatibility): The deserializer now strips the single trailing empty string produced when a message ends with the delimiter. This makes serialization and deserialization fully round-trip compatible — serialize("hello") → "hello\n" → deserialize → ["hello"] (1 row, not 2). Added testDeserializeTrailingDelimiter_noExtraRow and testRoundTrip_serializeThenDeserialize tests to verify this.

  3. Pre-compiled Pattern: Pattern.quote(lineDelimiter) is now pre-compiled once in the constructor as a Pattern lineDelimiterPattern field, eliminating repeated compilation on every message.

  4. Pre-computed delimiterBytes: delimiterBytes is now computed once in the constructor and stored as a field, eliminating repeated allocation on every row serialization.

  5. Documentation: Added raw.line-delimiter option to both docs/content/docs/connectors/table/formats/raw.md and docs/content.zh/docs/connectors/table/formats/raw.md, including a note on round-trip compatibility.

@featzhang featzhang force-pushed the feature/raw-format-line-delimiter branch from 3161013 to bb33242 Compare April 12, 2026 09:38
Copy link
Copy Markdown
Contributor

@spuru9 spuru9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the change.
LGTM +1
@rmetzger Feel free to give a write approval or another review.

Copy link
Copy Markdown
Contributor

@RocMarshal RocMarshal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @featzhang for the contribution.
Left some comments.
PTAL~

@RocMarshal RocMarshal self-assigned this Apr 15, 2026
@featzhang featzhang force-pushed the feature/raw-format-line-delimiter branch from bb33242 to b219d10 Compare April 16, 2026 01:52
@featzhang
Copy link
Copy Markdown
Member Author

featzhang commented Apr 16, 2026

Review Comments Addressed

✅ Fixed comments (from @RocMarshal)

# File Line Fix Status
1 RawFormatLineDelimiterTest.java 61 Use StandardCharsets.UTF_8 instead of string literal "UTF-8"
2 RawFormatLineDelimiterTest.java 63 Simplify assertion using hasToString()
3 RawFormatDeserializationSchema.java 68 Add note that lineDelimiter and lineDelimiterPattern are either both null or both non-null
4 RawFormatDeserializationSchema.java 131 Design explanation for null message handling (see below) 💬

📁 Modified files

  1. RawFormatLineDelimiterTest.java: Added StandardCharsets import, replaced 7 string literals, simplified 1 assertion using hasToString()
  2. RawFormatDeserializationSchema.java: Added nullability relationship comment for lineDelimiter and lineDelimiterPattern fields

💬 Design note for line 131 (null message with delimiter)

The current behavior (returning early on null message when a delimiter is set) is intentional and differs from the single-record path for the following reasons:

  • When lineDelimiter is set, the semantics shift from "one message → one row" to "one message → N rows via splitting". A null message in this context means there is no content to split, so emitting zero rows is the correct and expected result.
  • Emitting a row with a null field for a null message would be inconsistent with the splitting contract: if the delimiter is \n and the message is null, what does it mean to split null by \n? Producing a null row conflates "no data" with "one empty/null segment".
  • The existing test testDeserializeNullMessageWithDelimiter_noRows explicitly documents and verifies this design decision.

@RocMarshal PTAL, thanks!

@featzhang
Copy link
Copy Markdown
Member Author

Thanks @RocMarshal for pointing that out! I've now gone through the entire RawFormatLineDelimiterTest.java file and applied the same fixes consistently:

  • Replaced all "UTF-8" string literals with StandardCharsets.UTF_8 (in .getBytes() calls) and StandardCharsets.UTF_8.name() (in constructor parameters)
  • Replaced all .getString(0).toString()).isEqualTo("...") assertions with .getString(0)).hasToString("...")
  • Added the missing import java.nio.charset.StandardCharsets;

The fix covers all test methods: testDeserializeWithoutDelimiter_singleRow, testDeserializeWithNewlineDelimiter_multipleRows, testDeserializeWithCustomMultiCharDelimiter, testDeserializeWithNullMessage_noOutput, testDeserializeWithGbkCharset, all serialization tests, testDeserializeTrailingDelimiter_noExtraRow, and testRoundTrip_serializeThenDeserialize.

@RocMarshal PTAL, thanks!

Copy link
Copy Markdown
Contributor

@RocMarshal RocMarshal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @featzhang for update.
BTW, It would be perfect if the updated summary could reply point-by-point in the comments and be a bit more concise. That way, the reviewer wouldn’t need to break things down one by one and locate the corresponding changes to verify them.

LGTM+1 if the CI passed.

Looking forward to your next contribution~

@featzhang
Copy link
Copy Markdown
Member Author

The CI failure in the latest build (#74134) is unrelated to this PR.

The failing tests are:

  • SqlGatewayOpenRestAPIDocsCompletenessITCase#testSqlGatewayRestApiDocsUpToDate
  • RuntimeOpenRestAPIDocsCompletenessITCase#testRuntimeRestApiDocsUpToDate

These tests check that rest_v1_sql_gateway.yml and rest_v1_dispatcher.yml are up-to-date. The failures are caused by recent changes on the master branch that modified REST API endpoints (e.g., [FLINK-38893], [FLINK-38894], [FLINK-38895]), which updated the generated REST docs after this PR's branch diverged. This is a known flaky issue in long-lived PRs and has nothing to do with the raw format changes in this PR.

The code changes in this PR only touch:

  • flink-table/flink-table-runtime module (raw format source files and tests)
  • docs/content (raw format documentation)

@flinkbot run azure

@RocMarshal
Copy link
Copy Markdown
Contributor

Hi, @featzhang The CI failure was fixed in https://issues.apache.org/jira/browse/FLINK-39457.
Could you help rebase the latest master branch on the current path ?
Thx

…tion

- Use StandardCharsets.UTF_8 instead of string literals in tests
- Simplify assertions using hasToString() method
- Add documentation for field nullability relationship

Resolves review comments from @RocMarshal
@featzhang featzhang force-pushed the feature/raw-format-line-delimiter branch from 8f7dea3 to 2742e22 Compare April 16, 2026 12:56
@featzhang
Copy link
Copy Markdown
Member Author

Hi, @featzhang The CI failure was fixed in https://issues.apache.org/jira/browse/FLINK-39457.

Could you help rebase the latest master branch on the current path ?

Thx

I have fixed it by rebase the latest main branch, and CI has been successful.

@RocMarshal
Copy link
Copy Markdown
Contributor

Thanks @featzhang .
Merging...

@RocMarshal RocMarshal merged commit 2379582 into apache:master Apr 17, 2026
@featzhang featzhang deleted the feature/raw-format-line-delimiter branch April 17, 2026 03:18
@snuyanzin
Copy link
Copy Markdown
Contributor

@featzhang , @RocMarshal any reason config option was added without FLIP/ML discussion?
asking since if we look at main FLIP page (section What is considered a "major change" that needs a FLIP?) https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

All of the following are public interfaces that people build around:

DataStream, DataSet, SQL and Table API, including classes related to that, such as StreamExecutionEnvironment

Classes marked with the @Public annotation

On-disk binary formats, such as checkpoints/savepoints

User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Kubernetes scripts

Configuration settings

Exposed monitoring information

@featzhang
Copy link
Copy Markdown
Member Author

@featzhang , @RocMarshal any reason config option was added without FLIP/ML discussion? asking since if we look at main FLIP page (section What is considered a "major change" that needs a FLIP?) https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

All of the following are public interfaces that people build around:

DataStream, DataSet, SQL and Table API, including classes related to that, such as StreamExecutionEnvironment

Classes marked with the @Public annotation

On-disk binary formats, such as checkpoints/savepoints

User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Kubernetes scripts

Configuration settings

Exposed monitoring information

Hi @snuyanzin, thanks for raising this concern — and apologies for not initiating the discussion beforehand.

The raw.line-delimiter option is:

  • Optional with no default — existing behavior is entirely unchanged when the option is absent.
  • Additive-only — it adds a new capability without modifying any existing code paths.
  • Format-scoped — it is isolated to the raw format connector and does not touch any @Public API or SQL/Table API surface.

My understanding (which may be wrong) was that minor additive options within a format connector fall below the FLIP threshold, similar to how many other
format options (e.g., csv.field-delimiter, json.ignore-parse-errors) were introduced without a FLIP.

That said, I fully respect the community's position. If the consensus is that this option should have gone through FLIP/ML discussion, I'm happy to:

  1. Open a FLIP retroactively to formally document the design rationale, or
  2. Revert the change if the community feels it was premature.

What would you recommend, @snuyanzin and @RocMarshal?

@RocMarshal
Copy link
Copy Markdown
Contributor

Thanks @snuyanzin for the attention and reminder, @featzhang for the response.

Please let me to add some information from my perspective at the time when reviewing the PR, and sorry for missing that earlier, which may have caused confusion:

I had also considered whether we needed a new FLIP to track this parameter. However, based on the following reasoning, I proceeded with the merge:

  • A: As @featzhang​ mentioned, this new parameter belongs to a sub-component or a secondary dimension.
  • B: Following from point A I looked for recent, similar changes ([1], [2]) and checked whether they introduced new FLIPs. Since none of them did, I referenced their development and review pattern, reviewed accordingly, and merged this PR.

That said, if the introduction of this parameter feels too abrupt, I’m happy to revert it immediately. We can then reintroduce this change later, strictly following the specification you outlined.

To ensure consistency, I've incorporated this into my review checklist. Going forward, I'll follow the specification closely to avoid similar issues.

Any input is appreciated!

[1]https://issues.apache.org/jira/browse/FLINK-38372
[2]

@snuyanzin
Copy link
Copy Markdown
Contributor

I don't think we need revert here however would be great to have more visibility
like ML or RN

@RocMarshal
Copy link
Copy Markdown
Contributor

Thanks @snuyanzin .
That's reasonable.
I'll add it into the 2.3 release note.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants