[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490
[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490JNSimba wants to merge 7 commits intoapache:masterfrom
Conversation
…tJob create and alter Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
… alter offset test
- CDC ALTER only allows JSON specific offset (reject named modes like initial/latest)
- ALTER offset uses PROPERTIES('offset'='{"file":"xxx","pos":"yyy"}') syntax
- Update regression cases to use PROPERTIES for ALTER
- Add cdc_stream TVF ALTER offset regression test
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
…urceType().name() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR extends StreamingInsertJob CDC offset support so users can specify named offsets (initial/snapshot/latest/earliest) and JSON “specific offsets” when creating jobs via FROM MYSQL/POSTGRES, and JSON offsets when altering jobs (with validation and BE parsing support).
Changes:
- FE: extend source config validation to accept additional offset modes and JSON offsets; wire
validateSourceto be data-source-type aware; allow CDC jobs to useoffsetproperty and restrict ALTER to JSON offsets for CDC. - FE: implement/adjust JDBC offset property deserialization and improve
JdbcOffsetbehaviors (isValidOffset,toSerializedJson). - BE: add PostgreSQL JSON LSN offset support and handle
SPECIFIC_OFFSETSstartup mode.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy | Adds TVF-path regression coverage for ALTER with JSON MySQL binlog offset. |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy | Adds PG regression coverage for create offsets + ALTER with JSON LSN, plus invalid cases. |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy | Adds FE-restart regression coverage for MySQL JSON binlog offset persistence. |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy | Adds MySQL regression coverage for earliest/latest/JSON offsets and ALTER behavior. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java | Adds JSON LSN startup offset handling for PostgreSQL. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Handles SPECIFIC_OFFSETS startup mode by constructing offset from config. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java | Passes source type into source config validation for offset-type-specific rules. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java | Passes data source type into source config validation during ALTER. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Implements offset property deserialization for named modes and JSON. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java | Adjusts offset validity and serialization behavior. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Enables offset property validation beyond S3 and restricts CDC ALTER to JSON offsets. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java | Adds data-source-type-aware offset validation and JSON detection helper. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java
Outdated
Show resolved
Hide resolved
...nt/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
Show resolved
Hide resolved
...st/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy
Show resolved
Hide resolved
There was a problem hiding this comment.
Found 1 blocking issue.
- High - now serializes JDBC offsets in a different shape from what still reads. For binlog offsets the new JSON nests fields under , but the deserialize/replay path still expects a flattened list of string maps. persists this serialized value immediately after , so cloud replay / offset restore can no longer round-trip the altered CDC offset correctly.
Critical checkpoints:
- Goal of task: Support special offsets on create/alter. Partially achieved; create/alter paths were extended and regression tests were added, but the new serialized offset format breaks persisted/replayed CDC offsets in cloud mode.
- Modification size/focus: Focused overall.
- Concurrency: No new concurrency concerns identified in the touched code paths.
- Lifecycle/static init: No special lifecycle or static-init issues identified.
- Configuration: No new configs added.
- Compatibility/storage format: Blocking issue. The PR changes serialized JDBC offset shape without updating the corresponding deserialize/replay path.
- Parallel code paths: Checked both and job paths; the persistence problem sits in shared JDBC offset serialization.
- Special conditional checks: The new ALTER restriction to JSON offsets is understandable and commented.
- Test coverage: New regression tests cover MySQL/Postgres create/alter and one FE-restart scenario, but they do not cover cloud-mode reset/replay of an altered JSON offset.
- Observability: Existing logs seem adequate for this path.
- Transaction/persistence: Problematic because offset reset/replay persists a format the current reader does not consume.
- Data write/atomicity: No direct new write-atomicity issue beyond the persisted offset corruption risk.
- FE-BE variable passing: No new FE-BE field propagation issues identified.
- Performance: No material performance regressions identified in the reviewed diff.
- Other issues: None beyond the blocking serialization mismatch.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java
Show resolved
Hide resolved
…vider, fix serialization
- Add validateAlterOffset() to SourceOffsetProvider interface, CDC impl
rejects non-JSON offset, S3 impl allows any format (default no-op)
- StreamingInsertJob delegates ALTER offset validation to provider
- AlterJobCommand.validateProps calls validateAlterOffset before validateOffset
- Restore S3-only guard in initInsertJob for PROPERTIES offset
- Fix JdbcOffset.toSerializedJson() to produce flat format compatible
with deserializeOffset(): [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}]
- Remove unnecessary ts_usec padding in PostgresSourceReader JSON offset
- Remove instanceof check in modifyPropertiesInternal (validation moved to AlterJobCommand)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
|
run buildall |
…izeOffsetProperty Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Found 2 blocking correctness issues.
- FE accepts any JSON offset on ALTER for JDBC jobs.
JdbcSourceOffsetProvider.validateAlterOffset()anddeserializeOffsetProperty()only gate onisJsonOffset(), so a PostgreSQL job can accept MySQL-style JSON like{"file":"binlog.000001","pos":"154"}(and vice versa). ALTER succeeds, but the resumed job then fails in the broker becausePostgresSourceReader.generatePostgresConfig()requireslsn. - ALTER on the
FROM MYSQL/POSTGRES ... TOpath does not persist the new startup offset intosourceProperties.modifyPropertiesInternal()updates only the in-memory provider, while replay/reconstruction usessourceProperties(for example ingsonPostProcess()and when creating the next multi-table task). If FE restarts before the first post-ALTER commit, the altered offset is lost and the job resumes from the old mode/offset.
Critical checkpoint conclusions:
- Goal of current task: Partially accomplished. The PR adds special-offset support, but the ALTER path is still incorrect in the two scenarios above, so the feature is not correct end-to-end.
- Modification size/clarity: Mostly focused, but the ALTER validation/persistence logic is split in a way that misses source-specific constraints and durable state updates.
- Concurrency: No new lock-order or thread-safety issue found in the touched code.
- Lifecycle/static initialization: No special lifecycle or static-init issue found.
- Configuration items: No new config items added.
- Compatibility: No incompatible wire/storage format change identified.
- Parallel code paths: The CREATE path and downstream readers enforce source-specific semantics, but the ALTER path does not; that inconsistency is the root cause of one blocker.
- Special conditional checks: The new JSON-only check is too broad and should be source-type-aware.
- Test coverage: New regression tests cover many happy paths, but they do not cover invalid cross-source JSON on ALTER or FE restart before the first post-ALTER commit on the
FROM ... TOpath. - Observability: Existing logging looks sufficient to diagnose these failures.
- Transaction/persistence: There is a persistence bug for altered offsets on the
FROM ... TOpath because replay reconstructs from stalesourceProperties. - Data writes/modifications: The restart case can resume consumption from the wrong offset, so data correctness is at risk.
- FE/BE variable passing: No new propagation issue beyond the stale
sourcePropertiesstate above. - Performance: No material performance regression found in the touched code.
- Other issues: None beyond the two blocking findings above.
| return null; | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
This only checks whether the value is JSON, not whether the JSON is valid for the current source type. As written, a PostgreSQL job will accept MySQL-style JSON like {"file":"binlog.000001","pos":"154"} here, because deserializeOffsetProperty() also wraps any JSON object in a BinlogSplit. The ALTER succeeds, but the resumed job later fails in PostgresSourceReader.generatePostgresConfig() because that path requires an lsn. Please validate the JSON shape against sourceType here (MySQL: file + pos; PostgreSQL: lsn) instead of treating every JSON object as valid.
| if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) | ||
| && S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) { | ||
| if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) { | ||
| Offset offset = validateOffset(inputStreamProps.getOffsetProperty()); |
There was a problem hiding this comment.
For FROM MYSQL/POSTGRES ... TO jobs this only updates the in-memory provider. The durable state used by replay/reconstruction is still sourceProperties (gsonPostProcess() rebuilds new JdbcSourceOffsetProvider(getJobId(), dataSourceType, sourceProperties) from that map, and createStreamingMultiTblTask() also forwards it to BE). If FE restarts before the next successful commit, the altered offset is lost and the job resumes from the old mode/offset. Please also sync the new offset into sourceProperties for the non-TVF JDBC path before the ALTER is journaled.
|
run buildall |
Summary
FROM MYSQL/POSTGRESpath.DataSourceConfigValidatorto validate offset formats (initial/snapshot/latest/earliest/JSON);earliestis MySQL-only, rejected for PostgreSQL. ImplementJdbcSourceOffsetProvider.deserializeOffsetProperty()for named modes and JSON offset parsing. Remove S3-only restriction inStreamingInsertJob.initInsertJob()andmodifyPropertiesInternal()so CDC jobs can also use offset property. On ALTER, sync offset tosourcePropertiesfor the FROM...TO path.{"lsn":"N"}inPostgresSourceReader.generatePostgresConfig(), and handleSPECIFIC_OFFSETSmode inJdbcIncrementalSourceReader.getStartOffsetFromConfig().JdbcOffset.isValidOffset()andtoSerializedJson()to return meaningful values instead of hardcoded false/null.Test plan
test_streaming_mysql_job_special_offset— earliest/latest/JSON binlog offset with data sync verification, ALTER JOB offset change, invalid format rejectiontest_streaming_postgres_job_special_offset— initial/latest with data sync, ALTER JOB with JSON LSN offset and data sync verification, earliest rejection for PG, invalid format rejectiontest_streaming_mysql_job_special_offset_restart_fe— create job with JSON binlog offset, verify data sync, restart FE, verify job recovery and continued sync🤖 Generated with Claude Code