Skip to content

Wire sasl_oauth_token_provider into aiokafka client and KarapaceProducer#1277

Open
dstorey-swish wants to merge 2 commits into
Aiven-Open:mainfrom
dstorey-swish:fix/aiokafka-oauth-token-provider
Open

Wire sasl_oauth_token_provider into aiokafka client and KarapaceProducer#1277
dstorey-swish wants to merge 2 commits into
Aiven-Open:mainfrom
dstorey-swish:fix/aiokafka-oauth-token-provider

Conversation

@dstorey-swish
Copy link
Copy Markdown

@dstorey-swish dstorey-swish commented May 8, 2026

Summary

PR #1226 added sasl_oauth_token_provider_class config and wired it into the three confluent-kafka client factories in karapace.core.kafka_utils, plus the schema_reader's internal factories. Two more sites construct Kafka clients independently and were missed; this PR fixes them.

master_coordinator.MasterCoordinator.init_kafka_client

Constructs an aiokafka.AIOKafkaClient. aiokafka uses its own aiokafka.abc.AbstractTokenProvider protocol — async-first — rather than the librdkafka-flavored TokenWithExpiryProvider used by confluent-kafka clients. This PR adds a small _AiokafkaTokenAdapter so a single user-configured provider works for both libraries.

Without this fix, configuring OAUTHBEARER causes aiokafka to raise:

ValueError: sasl_oauth_token_provider needs to be provided implementing aiokafka.abc.AbstractTokenProvider

…master election never completes, and writes return 500 "No master set".

messaging.KarapaceProducer.initialize_karapace_producer

Constructs its own KafkaProducer rather than going through kafka_utils.kafka_producer_from_config. Without the provider passed here, OAUTHBEARER deployments stall at write time with TimeoutError on the producer's SASL handshake even after master election succeeds. Pattern is identical to the kafka_utils factories: consult get_oauth_token_provider, conditionally pass.

Test coverage

tests/unit/test_oauth_token_provider.py adds two new test classes mirroring the existing TestKafkaUtilsPassthrough / TestSchemaReaderPassthrough style:

  • TestMasterCoordinatorPassthrough
    • test_aiokafka_client_receives_wrapped_provider
    • test_aiokafka_client_omits_provider_when_not_configured
    • test_adapter_returns_token_string
  • TestKarapaceProducerPassthrough
    • test_producer_receives_provider
    • test_producer_omits_provider_when_not_configured

All 21 tests in the file pass.

End-to-end verification

Verified against a Kafka 3.7.1 broker configured with the unsecured OAUTHBEARER handlers (OAuthBearerUnsecuredValidatorCallbackHandler / OAuthBearerUnsecuredLoginCallbackHandler):

  • Schema register + read round-trip succeeds
  • The configured TokenProvider is invoked from all three SASL paths: schema_reader (confluent-kafka), master_coordinator (aiokafka), and KarapaceProducer (confluent-kafka)
  • aiokafka logs Authenticated via OAUTHBEARER, master election completes, _schemas is created, writes go through

Test plan

  • pytest tests/unit/test_oauth_token_provider.py -v — 21 passed
  • End-to-end OAUTHBEARER round-trip against an unsecured-JWS broker

Note

A future improvement worth considering: a single integration test that stands up an OAUTHBEARER-secured Kafka and exercises Karapace end-to-end would prevent this whole class of regression. The unit tests here are scoped per call site, which means any new KafkaProducer/KafkaConsumer/AIOKafkaClient instantiation would silently lack the wiring until a similar end-to-end test catches it.

@dstorey-swish dstorey-swish requested a review from a team as a code owner May 8, 2026 20:45
@dylanbstorey
Copy link
Copy Markdown
Contributor

@muralibasani - can i get some eyes on this and a bump so it can get into the CI/CD runner ?

@muralibasani muralibasani force-pushed the fix/aiokafka-oauth-token-provider branch from 3602df3 to 8b1f7c4 Compare May 15, 2026 13:52
@muralibasani
Copy link
Copy Markdown
Contributor

@dylanbstorey pls check failing ci lint

@muralibasani
Copy link
Copy Markdown
Contributor

@dylanbstorey can you pls rebase again ?

@muralibasani muralibasani force-pushed the fix/aiokafka-oauth-token-provider branch from 42c69a3 to d9c58c8 Compare May 19, 2026 15:34
PR Aiven-Open#1226 added `sasl_oauth_token_provider_class` config and wired it
into the three confluent-kafka client factories in `kafka_utils`, plus
the schema_reader's internal factories. Two more sites construct Kafka
clients independently and were missed:

- `master_coordinator.MasterCoordinator.init_kafka_client` constructs an
  `aiokafka.AIOKafkaClient`. aiokafka uses its own
  `aiokafka.abc.AbstractTokenProvider` protocol — async-first — rather
  than the librdkafka-flavored `TokenWithExpiryProvider`. Added a small
  adapter so a single user-configured provider works for both libraries.
  Without this fix, configuring OAUTHBEARER causes aiokafka to raise
  `ValueError: sasl_oauth_token_provider needs to be provided implementing
  aiokafka.abc.AbstractTokenProvider`, master election never completes,
  and writes return 500 "No master set".

- `messaging.KarapaceProducer.initialize_karapace_producer` constructs
  its own KafkaProducer rather than going through
  `kafka_utils.kafka_producer_from_config`. Without the provider passed
  here, OAUTHBEARER deployments stall at write time with TimeoutError on
  the producer's SASL handshake even after master election succeeds.

The new `TestMasterCoordinatorPassthrough` and
`TestKarapaceProducerPassthrough` test classes mirror the existing
`TestKafkaUtilsPassthrough` / `TestSchemaReaderPassthrough` style:
construct the component with a stub provider, mock the underlying client
class, assert the kwargs include the provider (and that the
master-coordinator path wraps it in an `AbstractTokenProvider`).

Verified end-to-end against a Kafka broker with the unsecured OAUTHBEARER
handlers: schema register + read round-trips succeed; the configured
TokenProvider is invoked from all three SASL paths (schema reader,
master coordinator, producer).
`_sasl_oauth_token_provider` is stored as `object` on Config but is
always validated as a `TokenWithExpiryProvider` on construction.
Narrowing the return type via `cast` lets mypy accept the argument to
`_AiokafkaTokenAdapter`, resolving the type-check CI failure.
@dstorey-swish dstorey-swish force-pushed the fix/aiokafka-oauth-token-provider branch from d9c58c8 to ad43123 Compare May 20, 2026 13:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants