Skip to content

[FLINK-20625] Implement V2 Google Cloud PubSub Source in accordance with FLIP-27#32

Open
clmccart wants to merge 13 commits into
apache:mainfrom
clmccart:final
Open

[FLINK-20625] Implement V2 Google Cloud PubSub Source in accordance with FLIP-27#32
clmccart wants to merge 13 commits into
apache:mainfrom
clmccart:final

Conversation

@clmccart
Copy link
Copy Markdown

@clmccart clmccart commented Oct 8, 2024

TLDR;

  • use StreamingPull instead of a synchronous pull to improve performance
  • implemented using the new Flip-27 interfaces (Source, SplitEnumerator, and SourceReader)

PubSubSplitEnumerator:

  • Since there is no limit in PubSub to the number of subscriber clients, there is no real concept of “split discovery”.
  • Therefore, a "split" in this implementation is simply represented by a single subscriber client for a single subscription.
  • The number of splits is determined by the parallelism of the job.
  • The PubSubSplitEnumerator monitors the number of SourceReaders that are added. It assigns a single split to every SourceReader, meaning each SourceReader will be pulling messages from Pub/Sub in parallel.

PubSubSourceReader:

  • The PubSubSplitReader wraps a Subscriber client from the Cloud Pub/Sub Java client library to asynchronously receive messages from Pub/Sub.
  • Single Subscriber client per source reader
  • Responsible for acknowledging messages
  • There are two main benefits to using the client library:
    • The Subscriber client uses StreamingPull to asynchronously receive messages and provide maximum throughput
    • The client library handles ack management, and automatically extends ack deadlines while processing messages
  • Uses checkpointing to acknowledge messages.
    • When a checkpoint completes, all outstanding messages will be acknowledged. Because acknowledgements are required, the v2 source requires checkpointing to be enabled.
  • flow control
    • max outstanding bytes: 100MB
    • max outstanding elements: defaults to 1000

Adds an example pipeline using the new source and adds the "Deprecated" tag to the old source implementation

Additional context:

Note that documentation will be updated in a separate PR

hannahrogers-google and others added 3 commits October 8, 2024 20:51
fix: remove unwanted changes

feat: create basic split enumerator

fix: requested changes

Update split.proto

Update PubSubSink.java

Update PubSubNotifyingPullSubscriber.java

Update PubSubSinkTest.java

Update PubSubCheckpointSerializerTest.java

Update PubSubSplitEnumeratorTest.java

feat: create pubsub source

fix: comment

Fix a typo in the ordering key prober readme (#353)

Create a parent pom.xml and restructure the flink-connector source code to support directories for integration tests and sample code. Also, replace Java's Optional class with Guava's Optional, which is serializable, to enable starting a Flink job that uses CPS as a source.

Add a WIP example of using CPS as a Flink source.

Clean up POM files.

Change example code to use both a PubSub sink and source.

Fix interrupt not returning empty messages

Add flow control setting to source builder.

Remove limitExceededBehavior source option

Create separate unit tests

Add PubSubSource integration test using CPS emulator

Remove emulatorEndpoint source builder option

Replace PubSubEmulatorTestBase with PubSubEmulatorHelper

Add initial documentation for Flink connector

Address PR comments

Add sink/it testing doc

Add builder options to source/sink

Document source/sink builder options

Add at-least-once delivery guarantee for CPS sink

Update README with connector status

Improve example Flink job and documentation

Update library version to 1.0.0-SNAPSHOT

Use docker to start CPS emulator in it tests

Add PubSubSinkEmulatorTest

Do not add source messages to a checkpoint until after it is emitted (#369)

Add contribution guide for CPS flink connector (#370)
gitignore

move examples into streaming folder

move sourcev2 example into right folder. does not compile

big move + proto compile

move to correct packages

import FixedHeaderProvider

add autovalue to pom

compiles

tests compile

add vscode files to gitignore

fix example pipeline

builds

revert flink-examples-streaming-gcp-pubsub pom

java.lang.NoClassDefFoundError: org/apache/flink/shaded/io/netty/channel/ChannelFactory

pipeline works
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented Oct 8, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@clmccart clmccart force-pushed the final branch 8 times, most recently from 84cc42f to 3af0b29 Compare October 9, 2024 15:59
@clmccart clmccart changed the title Final Donate PubSub source and sink rewrite back to Apache Oct 9, 2024
Comment thread .gitignore Outdated
@clmccart clmccart changed the title Donate PubSub source and sink rewrite back to Apache [FLINK-20625] Implement V2 Google Cloud PubSub Source in accordance with FLIP-27 Oct 9, 2024
@clmccart
Copy link
Copy Markdown
Author

clmccart commented Oct 9, 2024

looks like the sink rewrite was completed in FLINK-24298. will need to take a look at that and probably remove the sink changes from this PR

@clmccart
Copy link
Copy Markdown
Author

looks like the sink rewrite was completed in FLINK-24298. will need to take a look at that and probably remove the sink changes from this PR

the main reason we wrote a new sink implementation was because the previous implementation was deprecated. we didnt make any significant performance improvements so i went ahead and remove the sink rewrite from this PR. we can followup with changes to the sink implementation in a subsequent PR but no changes are pressing.

@clmccart clmccart marked this pull request as ready for review November 18, 2024 17:32
@clmccart
Copy link
Copy Markdown
Author

@snuyanzin would you mind taking a look at this PR? or is there someone else who might be a better fit?

@rob-apella
Copy link
Copy Markdown

rob-apella commented Jun 3, 2025

@clmccart are you still interested in getting this through? @vahmed-hamdy is this something you can help review? we're hitting some limits with the blocking subscriber. I'm happy to take a look but I'm not proficient in this repo. thanks 🙏

@clmccart
Copy link
Copy Markdown
Author

clmccart commented Jun 4, 2025

@clmccart are you still interested in getting this through? @vahmed-hamdy is this something you can help review? we're hitting some limits with the blocking subscriber. I'm happy to take a look but I'm not proficient in this repo. thanks 🙏

yeah i'm happy to push it through if there is a reviewer

@mwinkels
Copy link
Copy Markdown

Hi @clmccart ,

We are trying to use this code with our Flink setup and we found a couple of issues.

First, we are migrating to Flink v2.0 and had to adjust the code - mostly remove the classes that are no longer supported.

This is what we found:

  1. the streaming pull works fine and with Flink v2.0 job restarts are much faster, which leads to better performance overall.
  2. the PubSubSplitEnumerator could be completely stateless. GCP PubSub does not support splits that can be referenced externally, so there is no need to maintain any state on the connector side for assigning splits to readers.
  3. the SubscriptionSplit-state is similarly not really necessary, since there is only one subscription for a source and there is no meaningful split. The current implementation is harmful, since it stores the task-id in the split and when it recovers, there is a potential of re-creating subscriptions that are not part of an active reader. We remove the uid field from the SubscriptionSplitProto to fix this issue.
  4. the PubSubSplitReader is more complex then it needs to be. A SplitReader instance is always bound to a single source task, so it only ever needs a single subscription for the single split that it will received. The Map<Split, Subscription> that is currently in the PubSubSplitReader should only ever hold one instance. We replaced it with an instance field of type NotifyingPullSubscriber.

I hope these findings will help to improve this MR and get it to the finish line.

Best,
-Maarten Winkels

P.S.: I'm not sure how to contribute changes back, since they are Flink v2 based.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants