Wire sasl_oauth_token_provider into aiokafka client and KarapaceProducer#1277
Open
dstorey-swish wants to merge 2 commits into
Open
Wire sasl_oauth_token_provider into aiokafka client and KarapaceProducer#1277dstorey-swish wants to merge 2 commits into
dstorey-swish wants to merge 2 commits into
Conversation
Contributor
|
@muralibasani - can i get some eyes on this and a bump so it can get into the CI/CD runner ? |
3602df3 to
8b1f7c4
Compare
Contributor
|
@dylanbstorey pls check failing ci lint |
Contributor
|
@dylanbstorey can you pls rebase again ? |
42c69a3 to
d9c58c8
Compare
PR Aiven-Open#1226 added `sasl_oauth_token_provider_class` config and wired it into the three confluent-kafka client factories in `kafka_utils`, plus the schema_reader's internal factories. Two more sites construct Kafka clients independently and were missed: - `master_coordinator.MasterCoordinator.init_kafka_client` constructs an `aiokafka.AIOKafkaClient`. aiokafka uses its own `aiokafka.abc.AbstractTokenProvider` protocol — async-first — rather than the librdkafka-flavored `TokenWithExpiryProvider`. Added a small adapter so a single user-configured provider works for both libraries. Without this fix, configuring OAUTHBEARER causes aiokafka to raise `ValueError: sasl_oauth_token_provider needs to be provided implementing aiokafka.abc.AbstractTokenProvider`, master election never completes, and writes return 500 "No master set". - `messaging.KarapaceProducer.initialize_karapace_producer` constructs its own KafkaProducer rather than going through `kafka_utils.kafka_producer_from_config`. Without the provider passed here, OAUTHBEARER deployments stall at write time with TimeoutError on the producer's SASL handshake even after master election succeeds. The new `TestMasterCoordinatorPassthrough` and `TestKarapaceProducerPassthrough` test classes mirror the existing `TestKafkaUtilsPassthrough` / `TestSchemaReaderPassthrough` style: construct the component with a stub provider, mock the underlying client class, assert the kwargs include the provider (and that the master-coordinator path wraps it in an `AbstractTokenProvider`). Verified end-to-end against a Kafka broker with the unsecured OAUTHBEARER handlers: schema register + read round-trips succeed; the configured TokenProvider is invoked from all three SASL paths (schema reader, master coordinator, producer).
`_sasl_oauth_token_provider` is stored as `object` on Config but is always validated as a `TokenWithExpiryProvider` on construction. Narrowing the return type via `cast` lets mypy accept the argument to `_AiokafkaTokenAdapter`, resolving the type-check CI failure.
d9c58c8 to
ad43123
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
PR #1226 added
sasl_oauth_token_provider_classconfig and wired it into the three confluent-kafka client factories inkarapace.core.kafka_utils, plus the schema_reader's internal factories. Two more sites construct Kafka clients independently and were missed; this PR fixes them.master_coordinator.MasterCoordinator.init_kafka_clientConstructs an
aiokafka.AIOKafkaClient. aiokafka uses its ownaiokafka.abc.AbstractTokenProviderprotocol — async-first — rather than the librdkafka-flavoredTokenWithExpiryProviderused by confluent-kafka clients. This PR adds a small_AiokafkaTokenAdapterso a single user-configured provider works for both libraries.Without this fix, configuring OAUTHBEARER causes aiokafka to raise:
…master election never completes, and writes return 500 "No master set".
messaging.KarapaceProducer.initialize_karapace_producerConstructs its own
KafkaProducerrather than going throughkafka_utils.kafka_producer_from_config. Without the provider passed here, OAUTHBEARER deployments stall at write time withTimeoutErroron the producer's SASL handshake even after master election succeeds. Pattern is identical to the kafka_utils factories: consultget_oauth_token_provider, conditionally pass.Test coverage
tests/unit/test_oauth_token_provider.pyadds two new test classes mirroring the existingTestKafkaUtilsPassthrough/TestSchemaReaderPassthroughstyle:TestMasterCoordinatorPassthroughtest_aiokafka_client_receives_wrapped_providertest_aiokafka_client_omits_provider_when_not_configuredtest_adapter_returns_token_stringTestKarapaceProducerPassthroughtest_producer_receives_providertest_producer_omits_provider_when_not_configuredAll 21 tests in the file pass.
End-to-end verification
Verified against a Kafka 3.7.1 broker configured with the unsecured OAUTHBEARER handlers (
OAuthBearerUnsecuredValidatorCallbackHandler/OAuthBearerUnsecuredLoginCallbackHandler):TokenProvideris invoked from all three SASL paths: schema_reader (confluent-kafka), master_coordinator (aiokafka), and KarapaceProducer (confluent-kafka)Authenticated via OAUTHBEARER, master election completes,_schemasis created, writes go throughTest plan
pytest tests/unit/test_oauth_token_provider.py -v— 21 passedNote
A future improvement worth considering: a single integration test that stands up an OAUTHBEARER-secured Kafka and exercises Karapace end-to-end would prevent this whole class of regression. The unit tests here are scoped per call site, which means any new
KafkaProducer/KafkaConsumer/AIOKafkaClientinstantiation would silently lack the wiring until a similar end-to-end test catches it.