Skip to content

[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490

Open
JNSimba wants to merge 7 commits intoapache:masterfrom
JNSimba:improve_special_offset
Open

[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490
JNSimba wants to merge 7 commits intoapache:masterfrom
JNSimba:improve_special_offset

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Apr 14, 2026

Summary

  • Support specifying offset (binlog position, LSN, named modes) when creating or altering a StreamingInsertJob via FROM MYSQL/POSTGRES path.
  • FE: extend DataSourceConfigValidator to validate offset formats (initial/snapshot/latest/earliest/JSON); earliest is MySQL-only, rejected for PostgreSQL. Implement JdbcSourceOffsetProvider.deserializeOffsetProperty() for named modes and JSON offset parsing. Remove S3-only restriction in StreamingInsertJob.initInsertJob() and modifyPropertiesInternal() so CDC jobs can also use offset property. On ALTER, sync offset to sourceProperties for the FROM...TO path.
  • BE: support JSON LSN offset {"lsn":"N"} in PostgresSourceReader.generatePostgresConfig(), and handle SPECIFIC_OFFSETS mode in JdbcIncrementalSourceReader.getStartOffsetFromConfig().
  • Fix JdbcOffset.isValidOffset() and toSerializedJson() to return meaningful values instead of hardcoded false/null.

Test plan

  • Regression test: test_streaming_mysql_job_special_offset — earliest/latest/JSON binlog offset with data sync verification, ALTER JOB offset change, invalid format rejection
  • Regression test: test_streaming_postgres_job_special_offset — initial/latest with data sync, ALTER JOB with JSON LSN offset and data sync verification, earliest rejection for PG, invalid format rejection
  • Regression test: test_streaming_mysql_job_special_offset_restart_fe — create job with JSON binlog offset, verify data sync, restart FE, verify job recovery and continued sync

🤖 Generated with Claude Code

…tJob create and alter

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Thearas
Copy link
Copy Markdown
Contributor

Thearas commented Apr 14, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

JNSimba and others added 2 commits April 14, 2026 19:14
… alter offset test

- CDC ALTER only allows JSON specific offset (reject named modes like initial/latest)
- ALTER offset uses PROPERTIES('offset'='{"file":"xxx","pos":"yyy"}') syntax
- Update regression cases to use PROPERTIES for ALTER
- Add cdc_stream TVF ALTER offset regression test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run buildall

…urceType().name()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba JNSimba requested a review from Copilot April 15, 2026 02:24
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

/review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends StreamingInsertJob CDC offset support so users can specify named offsets (initial/snapshot/latest/earliest) and JSON “specific offsets” when creating jobs via FROM MYSQL/POSTGRES, and JSON offsets when altering jobs (with validation and BE parsing support).

Changes:

  • FE: extend source config validation to accept additional offset modes and JSON offsets; wire validateSource to be data-source-type aware; allow CDC jobs to use offset property and restrict ALTER to JSON offsets for CDC.
  • FE: implement/adjust JDBC offset property deserialization and improve JdbcOffset behaviors (isValidOffset, toSerializedJson).
  • BE: add PostgreSQL JSON LSN offset support and handle SPECIFIC_OFFSETS startup mode.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy Adds TVF-path regression coverage for ALTER with JSON MySQL binlog offset.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy Adds PG regression coverage for create offsets + ALTER with JSON LSN, plus invalid cases.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy Adds FE-restart regression coverage for MySQL JSON binlog offset persistence.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy Adds MySQL regression coverage for earliest/latest/JSON offsets and ALTER behavior.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java Adds JSON LSN startup offset handling for PostgreSQL.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java Handles SPECIFIC_OFFSETS startup mode by constructing offset from config.
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java Passes source type into source config validation for offset-type-specific rules.
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java Passes data source type into source config validation during ALTER.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java Implements offset property deserialization for named modes and JSON.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java Adjusts offset validity and serialization behavior.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java Enables offset property validation beyond S3 and restricts CDC ALTER to JSON offsets.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java Adds data-source-type-aware offset validation and JSON detection helper.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 1 blocking issue.

  1. High - now serializes JDBC offsets in a different shape from what still reads. For binlog offsets the new JSON nests fields under , but the deserialize/replay path still expects a flattened list of string maps. persists this serialized value immediately after , so cloud replay / offset restore can no longer round-trip the altered CDC offset correctly.

Critical checkpoints:

  • Goal of task: Support special offsets on create/alter. Partially achieved; create/alter paths were extended and regression tests were added, but the new serialized offset format breaks persisted/replayed CDC offsets in cloud mode.
  • Modification size/focus: Focused overall.
  • Concurrency: No new concurrency concerns identified in the touched code paths.
  • Lifecycle/static init: No special lifecycle or static-init issues identified.
  • Configuration: No new configs added.
  • Compatibility/storage format: Blocking issue. The PR changes serialized JDBC offset shape without updating the corresponding deserialize/replay path.
  • Parallel code paths: Checked both and job paths; the persistence problem sits in shared JDBC offset serialization.
  • Special conditional checks: The new ALTER restriction to JSON offsets is understandable and commented.
  • Test coverage: New regression tests cover MySQL/Postgres create/alter and one FE-restart scenario, but they do not cover cloud-mode reset/replay of an altered JSON offset.
  • Observability: Existing logs seem adequate for this path.
  • Transaction/persistence: Problematic because offset reset/replay persists a format the current reader does not consume.
  • Data write/atomicity: No direct new write-atomicity issue beyond the persisted offset corruption risk.
  • FE-BE variable passing: No new FE-BE field propagation issues identified.
  • Performance: No material performance regressions identified in the reviewed diff.
  • Other issues: None beyond the blocking serialization mismatch.

…vider, fix serialization

- Add validateAlterOffset() to SourceOffsetProvider interface, CDC impl
  rejects non-JSON offset, S3 impl allows any format (default no-op)
- StreamingInsertJob delegates ALTER offset validation to provider
- AlterJobCommand.validateProps calls validateAlterOffset before validateOffset
- Restore S3-only guard in initInsertJob for PROPERTIES offset
- Fix JdbcOffset.toSerializedJson() to produce flat format compatible
  with deserializeOffset(): [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}]
- Remove unnecessary ts_usec padding in PostgresSourceReader JSON offset
- Remove instanceof check in modifyPropertiesInternal (validation moved to AlterJobCommand)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

/review

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

run buildall

…izeOffsetProperty

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 blocking correctness issues.

  1. FE accepts any JSON offset on ALTER for JDBC jobs. JdbcSourceOffsetProvider.validateAlterOffset() and deserializeOffsetProperty() only gate on isJsonOffset(), so a PostgreSQL job can accept MySQL-style JSON like {"file":"binlog.000001","pos":"154"} (and vice versa). ALTER succeeds, but the resumed job then fails in the broker because PostgresSourceReader.generatePostgresConfig() requires lsn.
  2. ALTER on the FROM MYSQL/POSTGRES ... TO path does not persist the new startup offset into sourceProperties. modifyPropertiesInternal() updates only the in-memory provider, while replay/reconstruction uses sourceProperties (for example in gsonPostProcess() and when creating the next multi-table task). If FE restarts before the first post-ALTER commit, the altered offset is lost and the job resumes from the old mode/offset.

Critical checkpoint conclusions:

  • Goal of current task: Partially accomplished. The PR adds special-offset support, but the ALTER path is still incorrect in the two scenarios above, so the feature is not correct end-to-end.
  • Modification size/clarity: Mostly focused, but the ALTER validation/persistence logic is split in a way that misses source-specific constraints and durable state updates.
  • Concurrency: No new lock-order or thread-safety issue found in the touched code.
  • Lifecycle/static initialization: No special lifecycle or static-init issue found.
  • Configuration items: No new config items added.
  • Compatibility: No incompatible wire/storage format change identified.
  • Parallel code paths: The CREATE path and downstream readers enforce source-specific semantics, but the ALTER path does not; that inconsistency is the root cause of one blocker.
  • Special conditional checks: The new JSON-only check is too broad and should be source-type-aware.
  • Test coverage: New regression tests cover many happy paths, but they do not cover invalid cross-source JSON on ALTER or FE restart before the first post-ALTER commit on the FROM ... TO path.
  • Observability: Existing logging looks sufficient to diagnose these failures.
  • Transaction/persistence: There is a persistence bug for altered offsets on the FROM ... TO path because replay reconstructs from stale sourceProperties.
  • Data writes/modifications: The restart case can resume consumption from the wrong offset, so data correctness is at risk.
  • FE/BE variable passing: No new propagation issue beyond the stale sourceProperties state above.
  • Performance: No material performance regression found in the touched code.
  • Other issues: None beyond the two blocking findings above.

return null;
}

@Override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only checks whether the value is JSON, not whether the JSON is valid for the current source type. As written, a PostgreSQL job will accept MySQL-style JSON like {"file":"binlog.000001","pos":"154"} here, because deserializeOffsetProperty() also wraps any JSON object in a BinlogSplit. The ALTER succeeds, but the resumed job later fails in PostgresSourceReader.generatePostgresConfig() because that path requires an lsn. Please validate the JSON shape against sourceType here (MySQL: file + pos; PostgreSQL: lsn) instead of treating every JSON object as valid.

if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())
&& S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) {
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) {
Offset offset = validateOffset(inputStreamProps.getOffsetProperty());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For FROM MYSQL/POSTGRES ... TO jobs this only updates the in-memory provider. The durable state used by replay/reconstruction is still sourceProperties (gsonPostProcess() rebuilds new JdbcSourceOffsetProvider(getJobId(), dataSourceType, sourceProperties) from that map, and createStreamingMultiTblTask() also forwards it to BE). If FE restarts before the next successful commit, the altered offset is lost and the job resumes from the old mode/offset. Please also sync the new offset into sourceProperties for the non-TVF JDBC path before the ALTER is journaled.

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

run buildall

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants