Add GRPC Streaming producer#109
Conversation
Implement the foundation for a gRPC-based StreamingPull producer as an alternative to the existing REST-based pull producer. This provides low-latency, push-based message delivery from Google Cloud Pub/Sub. Key additions: - Streaming.Producer GenStage producer with Broadway integration - Streaming.StreamManager for managing bidirectional gRPC streams - Streaming.Acknowledger for message acknowledgment - Streaming.Options for configuration validation - MessageBuilder for constructing Broadway messages from Pub/Sub responses - Protobuf definitions for the Pub/Sub v1 API - Backoff module for exponential retry logic - HTTP/2 connection handling with Mint adapter support
Extract the stream reading logic into a dedicated StreamReader module, separating concerns between connection management and message reading. Add error classification for gRPC response codes and improve the overall reliability of the stream manager. Key changes: - Extract StreamReader module from StreamManager - Add ErrorClassifier for categorizing gRPC errors - Improve stream manager test coverage - Clean up module interfaces and responsibilities
Implement the acknowledgment pipeline using unary RPCs for batched message acknowledgment and deadline modification. This replaces stream-based acks with a more reliable unary RPC approach. Key additions: - AckBatcher for batching acknowledge/nack/deadline operations - AckResult for tracking acknowledgment outcomes - AckTimeDistribution for adaptive ack deadline estimation - UnaryRpcClient for making acknowledge and modifyAckDeadline RPCs - UnaryAckSupervisor for managing concurrent ack tasks - Stress test suite for high-throughput scenarios
Add support for graceful draining of streaming pull connections, ensuring in-flight messages are properly acknowledged before shutdown. Introduce partition-based message routing for multi-processor topologies. Key changes: - Implement prepare_for_draining callback in the producer - Add drain_timeout option for controlling shutdown behavior - Close inner gRPC streams during draining - Add partition_by support in the producer for message routing - Add gRPC test adapters for draining behavior verification
Introduce synchronous acknowledgment mode and improve error handling for failed and cancelled gRPC operations. Harden the producer for production use with better stream lifecycle management. Key changes: - Add synchronous ack mode for at-least-once delivery guarantees - Classify FAILED_PRECONDITION and CANCELLED errors appropriately - Simplify producer and stream manager internal state - Improve ack batcher with configurable batch sizes - Remove unused streaming pull response handling code
Introduce a client behaviour (Streaming.Client) and default gRPC implementation (Streaming.GrpcClient) to decouple the stream manager from the gRPC transport. Regenerate protobuf definitions using buf. Key changes: - Add Streaming.Client behaviour for pluggable transport - Add Streaming.GrpcClient as the default implementation - Add buf.gen.yaml and buf.yaml for protobuf generation - Regenerate Pub/Sub v1 protobuf modules - Update StreamManager and UnaryRpcClient to use client abstraction
Add comprehensive telemetry events for the streaming producer with rich metadata including subscription, project, and topic information. Emit events for stream connections, message receipt, acknowledgments, and errors to enable observability and monitoring. Key changes: - Add Streaming.Telemetry module with event definitions - Attach metadata (subscription, project_id) to all telemetry events - Instrument GrpcClient, AckBatcher, and UnaryRpcClient - Add TelemetryHelper test support module - Bump grpc dependency to ~> 1.0.0-rc.1
Add support for gRPC channel interceptors and improve the options handling across the streaming producer modules. Allow users to configure custom interceptors for logging, metrics, or auth middleware. Key changes: - Add interceptors option for gRPC channel configuration - Centralize options management through Streaming.Options - Support modifyAckDeadline with configurable deadlines - Pass options struct consistently through all streaming modules
Redesign the producer-manager relationship to support N producers each with their own dedicated stream manager (1:1 mapping). Improve the draining lifecycle with nack-on-timeout behavior and extract focused modules for lease management and message dispatch. Key changes: - Implement 1:1 producer-to-stream-manager topology - Move prepare_for_draining logic into StreamManager - Add nack on drain_timeout for unprocessed messages - Extract LeaseManager for ack deadline extension scheduling - Extract MessageDispatch for message delivery logic - Centralize demand management in StreamManager - Simplify producer to delegate stream concerns to manager
Add module documentation, typespecs, and inline comments across all streaming producer modules. Update README with StreamingPull producer usage instructions and configuration examples. Key changes: - Add @moduledoc and function docs to streaming modules - Add typespecs to public APIs - Document configuration options in Streaming.Options - Update README with Streaming.Producer setup guide - Update mix.exs with grpc dependency documentation
… coverage Refactor the streaming producer internals for better reliability and maintainability. Add RetryTracker module for per-ack-ID retry state management with deadline and attempt-limit enforcement. Key changes: - Add RetryTracker for tracking retry state per ack ID - Refactor StreamManager connection handling and reconnect logic - Improve AckBatcher with retry-aware classification functions - Simplify reset_connection interface (remove unused reason param) - Refactor LeaseManager and MessageDispatch module interfaces - Add comprehensive tests for LeaseManager, MessageDispatch, RetryTracker, and Acknowledger
Configure buf to generate only the Subscriber service protobuf code, removing unused schema definitions and reducing generated code size. Key changes: - Add buf.gen.yaml with opt targeting Subscriber service - Remove unused schema.pb.ex (Pub/Sub schema API) - Reduce pubsub.pb.ex from full API surface to subscriber-only
|
Hi @rockneurotiko! Thank you for the pull request! Would you like to become a maintainer of this project, given you are already maintaining a fork anyway? One option is to release the current version as v1.0 and then you can start the GRPC as v2.0, maintained by you/your team. |
|
@josevalim Sure! I'm more than happy to become a mantainer of the library. The versioning idea sounds great, even the 1.0 could be with both since the existing producer is not changed at all, but the double version is a safe approach that makes sense |
|
@rockneurotiko I have invited you to the new organization. I will branch and ship v1.0 and then you can proceed with v2.0. It is up to you if you want to keep maintaining the current interface! |
@josevalim Great, I'll pick it from there. It seems that the CI didn't work, today github had issues with the actions, maybe it's worth retrying them. |
Hey!
Sorry for the big PR, I talked with @josevalim and @wojtekmach at the Elixir Conf EU about opening this PR or creating an off_broadway_cloud_pub_sub library, and they agreed that it's worth opening the PR and see where it leads.
In my team we use google pubsub a lot, to the point that the pull method is not enough, specifically for these reasons:
A lot of open connections to google, it makes the observability hard to know what's happening and it generate a lot of traffic (HTTP and SSL data transfer overhead)
There is no way to extend the lease while processing messages, this makes that if you have a slow (or sometimes slow) subscriber, you have to set a high deadline for topic
On shutdown, the http call is just killed (because it can't wait the long-poll to finish in 30 seconds), and google don't handle it properly (not in HTTP 1.1 nor HTTP 2), and if there were messages that were going to be sent to that http call, pubsub thinks it was sent, making it go to the deadline and fail (and retry or not, depends on the setting). This together with the high deadline in the point before makes that on deployments/restarts, some messages get delayed by the full long deadline.
Since this is the official broadway library for pubsub, I think it should support grpc streaming. All official google's pubsub libraries (I think all except PHP) use GRPC streaming as default, they usually provide http client code to do pull requests manually, but the high level abstractions don't have support for it (except Go). In the docs they recommend using it if possible and that http don't guarantee low latency or high throughput).
This PR adds GRPC Streaming producer, it's totally optional to use it, and the existing pull producer is not changed, so the library users don't have breaking changes and they have to opt-in for the streaming producer instead of the pull one.
This is all the features that the streaming producer has, to have feature-parity with the official libraries:
Dual GRPC connections, one for reading the messages (the bidi
StreamingPull), and another for sending acks, modAcks and lease extensions (it's called unary connection). Both reconnect independently, so acks can keep flowing while the stream is reconnecting, and stream traffic doesn't head-of-line-block the ack traffic.Server-side flow control via
max_outstanding_messagesandmax_outstanding_bytes(the latter defaults to 100 MiB). The server stops pushing when either limit is hit, so memory pressure is bounded regardless of how fast the downstream processors are.Backpressure integrated with Broadway/GenStage demand. Internally the producer tracks "outstanding" messages (dispatched, waiting for ack) and "buffered" messages (received from the stream, waiting for downstream demand). Buffered messages survive reconnects, and
pending_demandis preserved across reconnects too: GenStage only emits new demand when capacity opens up, so resetting it on reconnect would deadlock the pipeline.Automatic lease extension while messages are being processed, with adaptive p99-based ack deadlines (so a slow subscriber doesn't keep getting redeliveries, but a fast one doesn't waste modAck calls either). Capped by
max_extension_ms(default 60 minutes) so a stuck consumer can't hold messages forever and eventually lets the server redeliver them to another subscriber.Reconnection handling with configurable backoff (
:rand_exp,:exp,:rand, or:stop), withbackoff_min/backoff_maxdefaulting to 100ms / 60s. Randomised exponential backoff is the default to avoid thundering herd after a mass disconnect (e.g. a load balancer drain). Outstanding and buffered messages are preserved across reconnects; the producer only shuts down after repeated non recoverable errors (tracked by aRetryTracker).Batched acks and modAcks via the unary connection, with configurable
ack_batch_interval_msandack_batch_max_size(capped at the Google API limit of 2,500 ack_ids per RPC). ModAcks are grouped by deadline value because aModifyAckDeadlineRPC carries a single deadline for all of its ack_ids, so grouping minimises the number of RPCs per flush cycle.Exactly-once delivery support, auto-detected from
SubscriptionPropertiesat runtime. When enabled:Message ordering support via
enable_message_ordering. Uses Broadway's built-inpartition_byso messages with the sameordering_keygo to the same processor and are processed sequentially. A stableclient_idis propagated to all stream managers so the server can use sticky assignment, which is required for ordered subscriptions to keep delivering the same ordering key to the same subscriber across reconnects.Graceful shutdown with a configurable
drain_timeout_ms(default 30s): stop pulling new messages, wait for outstanding to be acked, then nack the remainder per theon_shutdownsetting (:noop,:nack, or{:nack, seconds}, defaulting to{:nack, 5}to avoid thundering herd on rolling deploys), then close the stream cleanly withCloseSend. This is what makes restarts not lose latency on in-flight messages, the third bullet point at the top of this PR.Configurable ack behaviour per message outcome via
on_success/on_failure(:ack,:noop, or{:nack, seconds}), keeping the same shape as the existing pull producer's API so users switching producers don't have to relearn it.N:N producer-manager topology: each Broadway producer has its own
StreamManager(owning one StreamingPull stream) and shares a singleUnaryAckSupervisorper producer, supervisingAckBatcherandUnaryRpcClient. This scales horizontally: more producers means more streams to Google, each with their own flow control budget, which is the only way to get past the per-stream throughput ceiling.Pluggable HTTP/2 adapter (
:gundefault,:mint, or any module implementingGRPC.Client.Adapter). Both built-in adapters are provided by thegrpclibrary. HTTP/2 PING keepalives are sent every 30s on the:gunadapter to keep Google's load balancer from closing idle connections, which it does after roughly 20s by default.Pluggable gRPC client behaviour (
BroadwayCloudPubSub.Streaming.Client): the defaultGrpcClientwraps thegrpclibrary, but it can be swapped out for tests or alternative transports without touching the producer.gRPC client-side interceptors (
interceptorsoption) attached to both channels, useful for logging, tracing, custom auth, etc. StandardGRPC.Client.Interceptorbehaviour.Pluggable auth via
goth/token_generator(same MFArgs API as the existing pull producer), with the token re-fetched on every (re)connection so short-lived tokens don't expire on a long-lived stream.Pub/Sub emulator support via
grpc_endpointanduse_ssl(set to"localhost:8085"andfalserespectively to point at the local emulator, same setup we use for the integration tests).Telemetry events that allow for great observability: stream lifecycle (connect, disconnect, reconnect, drain), ack batch flush timings and sizes, unary RPC spans, lease extension cycles, outstanding/buffered message gauges, pending demand and drain status. The
telemetry_metadataoption lets users attach static or dynamic (MFA) extra metadata to every event, so the events plug straight into Datadog / Prometheus / etc. without per-call boilerplate.Minimal generated protobuf surface: the proto generation is filtered down to the three RPCs we actually use (
StreamingPull,Acknowledge,ModifyAckDeadline), going from 124 generated modules / 2647 lines down to 12 modules / 265 lines. The fullSubscriberservice has 17 RPCs plusPublisherandSchemaService, and pulling all of them in would bloat compile times and conflict with thegoogleapisHex dep'sgoogle.api.*annotation modules. Reproducible viabuf generate.I know that it's a big PR, but I would love to start the conversation and the review. If agreed to have it in the library, I would wait until grpc 1.0 is released, but the PR can be reviewed, it uses the release candidate at the moment.
I've been working on this for more a couple of months and we've been running it in production for some weeks now, I can share in private datadog screenshots showing how it performs and the potential of the telemetry events. I also have integration and stress tests that require the emulator (or real pubsub), I haven't included them on the PR but I can share them.
On top of the PR itself, I have a few internal documents that didn't seem worth pushing to the branch but I'm happy to share if useful, they capture the non obvious architectural rationale (Mint stream cancellation quirks, reconnect deduplication, exactly once receipt gating, why we don't dedupe ack_ids intra stream, etc.), a comparison of the official Go / Java / Python / Node.js clients that informed a lot of the defaults, and an architecture write up of how the supervision tree, stream manager and ack pipeline fit together. Just let me know and I'll send them over, or inline whatever bits would help the review.