Skip to content

Add GRPC Streaming producer#109

Open
rockneurotiko wants to merge 24 commits into
elixir-broadway:mainfrom
rockneurotiko:pub_sub_streaming
Open

Add GRPC Streaming producer#109
rockneurotiko wants to merge 24 commits into
elixir-broadway:mainfrom
rockneurotiko:pub_sub_streaming

Conversation

@rockneurotiko
Copy link
Copy Markdown
Contributor

Hey!

Sorry for the big PR, I talked with @josevalim and @wojtekmach at the Elixir Conf EU about opening this PR or creating an off_broadway_cloud_pub_sub library, and they agreed that it's worth opening the PR and see where it leads.

In my team we use google pubsub a lot, to the point that the pull method is not enough, specifically for these reasons:

  • A lot of open connections to google, it makes the observability hard to know what's happening and it generate a lot of traffic (HTTP and SSL data transfer overhead)

  • There is no way to extend the lease while processing messages, this makes that if you have a slow (or sometimes slow) subscriber, you have to set a high deadline for topic

  • On shutdown, the http call is just killed (because it can't wait the long-poll to finish in 30 seconds), and google don't handle it properly (not in HTTP 1.1 nor HTTP 2), and if there were messages that were going to be sent to that http call, pubsub thinks it was sent, making it go to the deadline and fail (and retry or not, depends on the setting). This together with the high deadline in the point before makes that on deployments/restarts, some messages get delayed by the full long deadline.

Since this is the official broadway library for pubsub, I think it should support grpc streaming. All official google's pubsub libraries (I think all except PHP) use GRPC streaming as default, they usually provide http client code to do pull requests manually, but the high level abstractions don't have support for it (except Go). In the docs they recommend using it if possible and that http don't guarantee low latency or high throughput).

This PR adds GRPC Streaming producer, it's totally optional to use it, and the existing pull producer is not changed, so the library users don't have breaking changes and they have to opt-in for the streaming producer instead of the pull one.

This is all the features that the streaming producer has, to have feature-parity with the official libraries:

  • Dual GRPC connections, one for reading the messages (the bidi StreamingPull), and another for sending acks, modAcks and lease extensions (it's called unary connection). Both reconnect independently, so acks can keep flowing while the stream is reconnecting, and stream traffic doesn't head-of-line-block the ack traffic.

  • Server-side flow control via max_outstanding_messages and max_outstanding_bytes (the latter defaults to 100 MiB). The server stops pushing when either limit is hit, so memory pressure is bounded regardless of how fast the downstream processors are.

  • Backpressure integrated with Broadway/GenStage demand. Internally the producer tracks "outstanding" messages (dispatched, waiting for ack) and "buffered" messages (received from the stream, waiting for downstream demand). Buffered messages survive reconnects, and pending_demand is preserved across reconnects too: GenStage only emits new demand when capacity opens up, so resetting it on reconnect would deadlock the pipeline.

  • Automatic lease extension while messages are being processed, with adaptive p99-based ack deadlines (so a slow subscriber doesn't keep getting redeliveries, but a fast one doesn't waste modAck calls either). Capped by max_extension_ms (default 60 minutes) so a stuck consumer can't hold messages forever and eventually lets the server redeliver them to another subscriber.

  • Reconnection handling with configurable backoff (:rand_exp, :exp, :rand, or :stop), with backoff_min/backoff_max defaulting to 100ms / 60s. Randomised exponential backoff is the default to avoid thundering herd after a mass disconnect (e.g. a load balancer drain). Outstanding and buffered messages are preserved across reconnects; the producer only shuts down after repeated non recoverable errors (tracked by a RetryTracker).

  • Batched acks and modAcks via the unary connection, with configurable ack_batch_interval_ms and ack_batch_max_size (capped at the Google API limit of 2,500 ack_ids per RPC). ModAcks are grouped by deadline value because a ModifyAckDeadline RPC carries a single deadline for all of its ack_ids, so grouping minimises the number of RPCs per flush cycle.

  • Exactly-once delivery support, auto-detected from SubscriptionProperties at runtime. When enabled:

    • The retry deadline for ack/modAck RPCs is automatically bumped from 60s to 600s, because with exactly-once enabled the server can return transient errors for extended periods while it guarantees exactly-once semantics, and giving up early would cause spurious redeliveries.
    • Messages are gated on a receipt modAck: they aren't dispatched to processors until the server confirms receipt, so if the receipt fails we can nack and let the server redeliver rather than risk a duplicate.
    • If the property flips back to disabled at runtime, the configured retry deadline is restored.
  • Message ordering support via enable_message_ordering. Uses Broadway's built-in partition_by so messages with the same ordering_key go to the same processor and are processed sequentially. A stable client_id is propagated to all stream managers so the server can use sticky assignment, which is required for ordered subscriptions to keep delivering the same ordering key to the same subscriber across reconnects.

  • Graceful shutdown with a configurable drain_timeout_ms (default 30s): stop pulling new messages, wait for outstanding to be acked, then nack the remainder per the on_shutdown setting (:noop, :nack, or {:nack, seconds}, defaulting to {:nack, 5} to avoid thundering herd on rolling deploys), then close the stream cleanly with CloseSend. This is what makes restarts not lose latency on in-flight messages, the third bullet point at the top of this PR.

  • Configurable ack behaviour per message outcome via on_success / on_failure (:ack, :noop, or {:nack, seconds}), keeping the same shape as the existing pull producer's API so users switching producers don't have to relearn it.

  • N:N producer-manager topology: each Broadway producer has its own StreamManager (owning one StreamingPull stream) and shares a single UnaryAckSupervisor per producer, supervising AckBatcher and UnaryRpcClient. This scales horizontally: more producers means more streams to Google, each with their own flow control budget, which is the only way to get past the per-stream throughput ceiling.

  • Pluggable HTTP/2 adapter (:gun default, :mint, or any module implementing GRPC.Client.Adapter). Both built-in adapters are provided by the grpc library. HTTP/2 PING keepalives are sent every 30s on the :gun adapter to keep Google's load balancer from closing idle connections, which it does after roughly 20s by default.

  • Pluggable gRPC client behaviour (BroadwayCloudPubSub.Streaming.Client): the default GrpcClient wraps the grpc library, but it can be swapped out for tests or alternative transports without touching the producer.

  • gRPC client-side interceptors (interceptors option) attached to both channels, useful for logging, tracing, custom auth, etc. Standard GRPC.Client.Interceptor behaviour.

  • Pluggable auth via goth / token_generator (same MFArgs API as the existing pull producer), with the token re-fetched on every (re)connection so short-lived tokens don't expire on a long-lived stream.

  • Pub/Sub emulator support via grpc_endpoint and use_ssl (set to "localhost:8085" and false respectively to point at the local emulator, same setup we use for the integration tests).

  • Telemetry events that allow for great observability: stream lifecycle (connect, disconnect, reconnect, drain), ack batch flush timings and sizes, unary RPC spans, lease extension cycles, outstanding/buffered message gauges, pending demand and drain status. The telemetry_metadata option lets users attach static or dynamic (MFA) extra metadata to every event, so the events plug straight into Datadog / Prometheus / etc. without per-call boilerplate.

  • Minimal generated protobuf surface: the proto generation is filtered down to the three RPCs we actually use (StreamingPull, Acknowledge, ModifyAckDeadline), going from 124 generated modules / 2647 lines down to 12 modules / 265 lines. The full Subscriber service has 17 RPCs plus Publisher and SchemaService, and pulling all of them in would bloat compile times and conflict with the googleapis Hex dep's google.api.* annotation modules. Reproducible via buf generate.

I know that it's a big PR, but I would love to start the conversation and the review. If agreed to have it in the library, I would wait until grpc 1.0 is released, but the PR can be reviewed, it uses the release candidate at the moment.

I've been working on this for more a couple of months and we've been running it in production for some weeks now, I can share in private datadog screenshots showing how it performs and the potential of the telemetry events. I also have integration and stress tests that require the emulator (or real pubsub), I haven't included them on the PR but I can share them.

On top of the PR itself, I have a few internal documents that didn't seem worth pushing to the branch but I'm happy to share if useful, they capture the non obvious architectural rationale (Mint stream cancellation quirks, reconnect deduplication, exactly once receipt gating, why we don't dedupe ack_ids intra stream, etc.), a comparison of the official Go / Java / Python / Node.js clients that informed a lot of the defaults, and an architecture write up of how the supervision tree, stream manager and ack pipeline fit together. Just let me know and I'll send them over, or inline whatever bits would help the review.

Implement the foundation for a gRPC-based StreamingPull producer as an
alternative to the existing REST-based pull producer. This provides
low-latency, push-based message delivery from Google Cloud Pub/Sub.

Key additions:
- Streaming.Producer GenStage producer with Broadway integration
- Streaming.StreamManager for managing bidirectional gRPC streams
- Streaming.Acknowledger for message acknowledgment
- Streaming.Options for configuration validation
- MessageBuilder for constructing Broadway messages from Pub/Sub responses
- Protobuf definitions for the Pub/Sub v1 API
- Backoff module for exponential retry logic
- HTTP/2 connection handling with Mint adapter support
Extract the stream reading logic into a dedicated StreamReader module,
separating concerns between connection management and message reading.
Add error classification for gRPC response codes and improve the overall
reliability of the stream manager.

Key changes:
- Extract StreamReader module from StreamManager
- Add ErrorClassifier for categorizing gRPC errors
- Improve stream manager test coverage
- Clean up module interfaces and responsibilities
Implement the acknowledgment pipeline using unary RPCs for batched
message acknowledgment and deadline modification. This replaces
stream-based acks with a more reliable unary RPC approach.

Key additions:
- AckBatcher for batching acknowledge/nack/deadline operations
- AckResult for tracking acknowledgment outcomes
- AckTimeDistribution for adaptive ack deadline estimation
- UnaryRpcClient for making acknowledge and modifyAckDeadline RPCs
- UnaryAckSupervisor for managing concurrent ack tasks
- Stress test suite for high-throughput scenarios
Add support for graceful draining of streaming pull connections,
ensuring in-flight messages are properly acknowledged before shutdown.
Introduce partition-based message routing for multi-processor topologies.

Key changes:
- Implement prepare_for_draining callback in the producer
- Add drain_timeout option for controlling shutdown behavior
- Close inner gRPC streams during draining
- Add partition_by support in the producer for message routing
- Add gRPC test adapters for draining behavior verification
Introduce synchronous acknowledgment mode and improve error handling
for failed and cancelled gRPC operations. Harden the producer for
production use with better stream lifecycle management.

Key changes:
- Add synchronous ack mode for at-least-once delivery guarantees
- Classify FAILED_PRECONDITION and CANCELLED errors appropriately
- Simplify producer and stream manager internal state
- Improve ack batcher with configurable batch sizes
- Remove unused streaming pull response handling code
Introduce a client behaviour (Streaming.Client) and default gRPC
implementation (Streaming.GrpcClient) to decouple the stream manager
from the gRPC transport. Regenerate protobuf definitions using buf.

Key changes:
- Add Streaming.Client behaviour for pluggable transport
- Add Streaming.GrpcClient as the default implementation
- Add buf.gen.yaml and buf.yaml for protobuf generation
- Regenerate Pub/Sub v1 protobuf modules
- Update StreamManager and UnaryRpcClient to use client abstraction
Add comprehensive telemetry events for the streaming producer with
rich metadata including subscription, project, and topic information.
Emit events for stream connections, message receipt, acknowledgments,
and errors to enable observability and monitoring.

Key changes:
- Add Streaming.Telemetry module with event definitions
- Attach metadata (subscription, project_id) to all telemetry events
- Instrument GrpcClient, AckBatcher, and UnaryRpcClient
- Add TelemetryHelper test support module
- Bump grpc dependency to ~> 1.0.0-rc.1
Add support for gRPC channel interceptors and improve the options
handling across the streaming producer modules. Allow users to
configure custom interceptors for logging, metrics, or auth middleware.

Key changes:
- Add interceptors option for gRPC channel configuration
- Centralize options management through Streaming.Options
- Support modifyAckDeadline with configurable deadlines
- Pass options struct consistently through all streaming modules
Redesign the producer-manager relationship to support N producers each
with their own dedicated stream manager (1:1 mapping). Improve the
draining lifecycle with nack-on-timeout behavior and extract focused
modules for lease management and message dispatch.

Key changes:
- Implement 1:1 producer-to-stream-manager topology
- Move prepare_for_draining logic into StreamManager
- Add nack on drain_timeout for unprocessed messages
- Extract LeaseManager for ack deadline extension scheduling
- Extract MessageDispatch for message delivery logic
- Centralize demand management in StreamManager
- Simplify producer to delegate stream concerns to manager
Add module documentation, typespecs, and inline comments across all
streaming producer modules. Update README with StreamingPull producer
usage instructions and configuration examples.

Key changes:
- Add @moduledoc and function docs to streaming modules
- Add typespecs to public APIs
- Document configuration options in Streaming.Options
- Update README with Streaming.Producer setup guide
- Update mix.exs with grpc dependency documentation
… coverage

Refactor the streaming producer internals for better reliability and
maintainability. Add RetryTracker module for per-ack-ID retry state
management with deadline and attempt-limit enforcement.

Key changes:
- Add RetryTracker for tracking retry state per ack ID
- Refactor StreamManager connection handling and reconnect logic
- Improve AckBatcher with retry-aware classification functions
- Simplify reset_connection interface (remove unused reason param)
- Refactor LeaseManager and MessageDispatch module interfaces
- Add comprehensive tests for LeaseManager, MessageDispatch,
  RetryTracker, and Acknowledger
Configure buf to generate only the Subscriber service protobuf code,
removing unused schema definitions and reducing generated code size.

Key changes:
- Add buf.gen.yaml with opt targeting Subscriber service
- Remove unused schema.pb.ex (Pub/Sub schema API)
- Reduce pubsub.pb.ex from full API surface to subscriber-only
@josevalim
Copy link
Copy Markdown
Contributor

Hi @rockneurotiko! Thank you for the pull request!

Would you like to become a maintainer of this project, given you are already maintaining a fork anyway? One option is to release the current version as v1.0 and then you can start the GRPC as v2.0, maintained by you/your team.

@rockneurotiko
Copy link
Copy Markdown
Contributor Author

@josevalim Sure! I'm more than happy to become a mantainer of the library.

The versioning idea sounds great, even the 1.0 could be with both since the existing producer is not changed at all, but the double version is a safe approach that makes sense

@josevalim
Copy link
Copy Markdown
Contributor

@rockneurotiko I have invited you to the new organization. I will branch and ship v1.0 and then you can proceed with v2.0. It is up to you if you want to keep maintaining the current interface!

@rockneurotiko
Copy link
Copy Markdown
Contributor Author

@rockneurotiko I have invited you to the new organization. I will branch and ship v1.0 and then you can proceed with v2.0. It is up to you if you want to keep maintaining the current interface!

@josevalim Great, I'll pick it from there. It seems that the CI didn't work, today github had issues with the actions, maybe it's worth retrying them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants