Skip to content

[feature](RoutineLoad) Support the Amazon Kinesis#61325

Open
0AyanamiRei wants to merge 58 commits intoapache:masterfrom
0AyanamiRei:feature-routineload-AWS_Kinesis
Open

[feature](RoutineLoad) Support the Amazon Kinesis#61325
0AyanamiRei wants to merge 58 commits intoapache:masterfrom
0AyanamiRei:feature-routineload-AWS_Kinesis

Conversation

@0AyanamiRei
Copy link
Copy Markdown
Contributor

@0AyanamiRei 0AyanamiRei commented Mar 14, 2026

What problem does this PR solve?

Issue Number: close #xxx

Related PR: this pr should merge after #62184

Problem Summary:

support the Amazon Kinesis for routine load.

CREATE ROUTINE LOAD [db_name.]job_name ON table_name
[load_properties]
[job_properties]
FROM KINESIS
(
    "aws.region" = "your_region",
    "kinesis_stream" = "your_stream_name",
    "aws.access_key" = "your_access_key",
    "aws.secret_key" = "your_secret_key"
);

compare AWS Kinesis with Kafka:

Kinesis Stream <=> Kafka Topic
shards <=> partition

doc pr:apache/doris-website#3521

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 79.24% (1798/2269)
Line Coverage 64.56% (32298/50026)
Region Coverage 65.44% (16166/24702)
Branch Coverage 55.88% (8615/15416)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 1.15% (10/866) 🎉
Increment coverage report
Complete coverage report

Comment thread gensrc/thrift/BackendService.thrift Outdated
@liaoxin01
Copy link
Copy Markdown
Contributor

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found 3 correctness issues in this PR.

  1. be/src/load/stream_load/stream_load_executor.cpp: the Kinesis error path does not reset ctx->kinesis_info->cmt_sequence_number when plan execution fails. KinesisDataConsumerGroup::start_all() has already copied the last consumed sequence numbers into the context before the fragment/txn result is known, so a failed attempt can leave advanced progress in memory for the retried task. Kafka explicitly rewinds here; Kinesis needs the same protection to avoid skipping records after an aborted batch.

  2. fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java: cloud mode is not blocked, but the cloud routine-load transaction path is still Kafka-only. RoutineLoadManager can create Kinesis jobs in cloud mode, KinesisTaskInfo sets cloudCluster, and then TxnUtil.rlTaskTxnCommitAttachmentToPb() still casts attachment.getProgress() to KafkaProgress. The first cloud Kinesis commit will therefore fail with a ClassCastException instead of persisting progress.

  3. regression-test/conf/regression-conf.groovy: the new load_p0/kinesis_routine_load directory is added to excludeDirectories, so the entire new regression suite is skipped by default. That leaves the feature effectively untested in CI even though the PR adds many Kinesis cases.

Critical checkpoint conclusions:

  • Goal of the task / correctness / proof: The PR clearly aims to add end-to-end Kinesis routine-load support, but the current code does not fully achieve that because retry safety and cloud-mode behavior are still broken. Tests were added, but the regression config currently excludes them.
  • Small / clear / focused: Not fully. This is a broad cross-layer feature (FE, BE, thrift/proto, persistence, tests), so the risk is naturally high and missing parity with existing Kafka paths matters.
  • Concurrency: The feature introduces new consumer-group and concurrent progress-tracking paths. I did not find a primary lock-order bug in the reviewed hunks, but the retry/reset issue shows the lifecycle between consumer progress and transaction failure is not yet safe.
  • Lifecycle / initialization: No static initialization issue identified in the reviewed code.
  • Configuration items: New Kinesis properties are added. I did not validate dynamic-config behavior because these are routine-load job properties rather than mutable process configs.
  • Compatibility changes: Yes. New thrift/proto enums and structs are introduced. The most concrete compatibility/runtime gap I found is the cloud transaction attachment path remaining Kafka-specific.
  • Parallel code paths: Yes. Kafka already resets progress on failure and has a complete cloud progress path; Kinesis needs the same treatment.
  • Special conditional checks: The explicit Config.isCloudMode() branches in Kinesis code currently lead to incomplete behavior and should either be implemented fully or rejected early.
  • Test coverage: Functional tests were added, including restart/error scenarios, but they are excluded in regression-conf.groovy, so effective coverage is currently missing.
  • Observability: Basic logs are present; no major observability blocker found in the reviewed paths.
  • Transaction / persistence: Yes, this feature touches transaction commit attachments and persisted progress. The cloud attachment serialization path is currently incorrect for Kinesis.
  • Data write / modification: Yes. Retry safety is not guaranteed because failed attempts can retain advanced Kinesis progress.
  • FE/BE variable passing: The new FE/BE Kinesis progress structures are mostly wired, but cloud-specific passing is incomplete.
  • Performance: I did not identify a primary performance regression worth blocking on relative to the correctness issues above.
  • Other issues: No additional blocker beyond the three findings above.

Because of the issues above, I cannot consider the current implementation correct yet.

Comment thread be/src/load/stream_load/stream_load_executor.cpp Outdated
Comment thread regression-test/conf/regression-conf.groovy Outdated
@0AyanamiRei 0AyanamiRei requested a review from w41ter as a code owner April 8, 2026 21:47
…rganization

Fix test file imports to use new package paths:
- kafka.KafkaProgress, kafka.KafkaRoutineLoadJob, kafka.KafkaTaskInfo
- kinesis.KinesisProgress, kinesis.KinesisRoutineLoadJob

This fixes compilation errors in test files after moving Kafka and Kinesis
classes to their respective subdirectories.
### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Fix FE compilation failures introduced by routine load package migration (kafka/kinesis subpackages) by aligning method visibility for cross-package overrides and补齐 missing imports in routine load source and related FE tests.

### Release note

None

### Check List (For Author)

- Test: Manual test
    - Manual test: FE build `FE_MAVEN_THREADS=16 FE_MAVEN_RETRY_THREADS=4 DISABLE_BUILD_UI=ON DISABLE_BUILD_HIVE_UDF=ON DISABLE_BE_JAVA_EXTENSIONS=ON DISABLE_JAVA_CHECK_STYLE=ON MAVEN_OPTS='-Xms1g -Xmx4g -XX:+UseG1GC -XX:ActiveProcessorCount=16' MVN_OPT='-U -s /tmp/doris-maven-settings-doris.xml' bash build.sh --fe -j 16` (BUILD SUCCESS)
- Behavior changed: No (compile and test-compile compatibility fix)
- Does this need documentation: No
### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: FE checkstyle failed with import ordering, unused import, and redundant same-package import errors in routine load and related tests.

### Release note

None

### Check List (For Author)

- Test: Manual test

    - Manual test (user run): ./build.sh --fe -j16

- Behavior changed: No

- Does this need documentation: No
@sollhui
Copy link
Copy Markdown
Contributor

sollhui commented Apr 10, 2026

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 33.59% (303/902) 🎉
Increment coverage report
Complete coverage report

@sollhui
Copy link
Copy Markdown
Contributor

sollhui commented Apr 24, 2026

run buildall

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 78.06% (1843/2361)
Line Coverage 64.75% (32987/50947)
Region Coverage 65.27% (16372/25085)
Branch Coverage 55.82% (8737/15652)

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 78.06% (1843/2361)
Line Coverage 64.77% (33000/50947)
Region Coverage 65.30% (16380/25085)
Branch Coverage 55.83% (8739/15652)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 33.59% (303/902) 🎉
Increment coverage report
Complete coverage report

### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Kinesis conf relied on indirect precompiled-header includes for Status and Doris logging macros, which can fail in non-PCH builds.

### Release note

None

### Check List (For Author)

- Test: Manual test
    - Header self-contained syntax check for kinesis_conf.h
    - Non-PCH syntax check for kinesis_conf.cpp
- Behavior changed: No
- Does this need documentation: No
@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 1.09% (10/914) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 78.06% (1843/2361)
Line Coverage 64.74% (32984/50947)
Region Coverage 65.25% (16367/25085)
Branch Coverage 55.77% (8729/15652)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 33.59% (303/902) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 1.10% (10/906) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 17.04% (158/927) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 71.92% (27027/37579)
Line Coverage 55.16% (287425/521080)
Region Coverage 52.08% (237668/456341)
Branch Coverage 53.58% (102808/191860)

@sollhui
Copy link
Copy Markdown
Contributor

sollhui commented Apr 28, 2026

/review

@0AyanamiRei
Copy link
Copy Markdown
Contributor Author

@sollhui Except for two Docker cases, all other tests have passed locally.

Copy link
Copy Markdown
Contributor

@sollhui sollhui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions
Copy link
Copy Markdown
Contributor

PR approved by anyone and no changes requested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants