def oauth_cb(oauth_config):
region = os.getenv('AWS_REGION') or os.getenv('AWS_DEFAULT_REGION') or 'eu-west-1'
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(region)
return auth_token, expiry_ms / 1000 - 60
...
conf = {
'bootstrap.servers': broker_url,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'oauth_cb': oauth_cb,
"error_cb": error_cb,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': json_serializer,
'socket.keepalive.enable': True,
}
return SerializingProducer(conf)
The same code works with 2.12.2.
Possible cause #2130 ?