support redis cluster transport #2204
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2204 +/- ##
==========================================
+ Coverage 81.60% 81.89% +0.28%
==========================================
Files 77 78 +1
Lines 9540 9963 +423
Branches 1162 1238 +76
==========================================
+ Hits 7785 8159 +374
- Misses 1563 1591 +28
- Partials 192 213 +21 ☔ View full report in Codecov by Sentry. |
dca3e75 to
9983557
Compare
f4d275a to
f5c7356
Compare
c87be51 to
6986510
Compare
|
Only one test is failing, due to connection failure |
|
Check linter error :) |
|
Sorry for the failure, I will fix it and improve the test coverage. |
|
I am following it and already had reviewed it twice. Will have an in depth review again tomorrow. No worries. Thanks for picking my work |
Added docs for kombu.transport.rediscluster, now I'm sure there won't be problems anymore |
Nusnus
left a comment
There was a problem hiding this comment.
Finally that everything passes on the CI, we can properly review the code.
That being said, we’re currently in a release phase so we can’t merge it until we complete the release, after the new year holidays.
Good work and thank you for fixing everything.
|
Would this also support Redis cluster for backend? |
|
|
||
| conn_params['connection_pool_class'] = ManagedConnectionPool | ||
|
|
||
| conn_params['url'] = f'redis://{conn_params["host"]}:{conn_params["port"]}' |
There was a problem hiding this comment.
@zs-neo thanks for all of your work on this! I'm testing out this PR right now and noticed that TLS support is broken. it looks as though it might be as simple as using the rediss scheme here with the ssl logic above?
There was a problem hiding this comment.
I'm very sorry for the late reply. Thank you for your attention to this PR! I agree with you and I will add support and testing for TLS.
|
It would be nice to see this land sooon as I'm trying to use AWS serverless valkey and it runs in a cluster mode so Celery won't run on it. It's going to become more common for people to want to run their projects on serverless versions since they tend to offer scalable storage and not a fixed amount like you would get when you use a specific instance type. I guess my workaround for now will be to spin up a normal version just for celery to use. |
for more information, see https://pre-commit.ci
Co-authored-by: bashir-abdelwahed <51696104+bashir-abdelwahed@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
5e5de4c to
134a2bb
Compare
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
kombu/transport/rediscluster.py:1
- Typo in variable name: 'disconect' should be 'disconnect'
"""Redis cluster transport module for Kombu.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
Hello team, any update on this one? Looks like it's very close to get shipped to next release? |
|
@auvipy Looks like it's a documentation issue, is it transient, or is there something else needed? |
|
this isslated for v5.7.0 release |
auvipy
left a comment
There was a problem hiding this comment.
=================================== FAILURES ===================================
_____________________ test_Channel.test_prefixed_pipeline ______________________
self = <t.unit.transport.test_rediscluster.test_Channel object at 0x7f074f0af7c0>
mock_initialize =
mock_execute_command =
@patch("redis.cluster.RedisCluster.execute_command")
@patch('redis.cluster.NodesManager.initialize')
def test_prefixed_pipeline(self, mock_initialize, mock_execute_command):
client = redis.PrefixedStrictRedis(global_keyprefix='foo_', startup_nodes=[ClusterNode()])
pipeline = client.pipeline()
t/unit/transport/test_rediscluster.py:614:
self = <kombu.transport.rediscluster.PrefixedStrictRedis object at 0x7f0736200ca0>
transaction = False, shard_hint = None
Restoring 2 unacknowledged message(s)
def pipeline(self, transaction=False, shard_hint=None):
if shard_hint:
raise RedisClusterException("shard_hint is deprecated in cluster mode")
if transaction:
raise RedisClusterException("transaction is deprecated in cluster mode")
return PrefixedRedisPipeline(
nodes_manager=self.nodes_manager,
commands_parser=self.commands_parser,
startup_nodes=self.nodes_manager.startup_nodes,
result_callbacks=self.result_callbacks,
cluster_response_callbacks=self.cluster_response_callbacks,
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
read_from_replicas=self.read_from_replicas,
reinitialize_steps=self.reinitialize_steps,
lock=self._lock,
global_keyprefix=self.global_keyprefix,
)
E AttributeError: 'PrefixedStrictRedis' object has no attribute 'cluster_error_retry_attempts'
kombu/transport/rediscluster.py:133: AttributeError
=============================== warnings summary ===============================
.tox/3.9-unit/lib/python3.9/site-packages/coverage/inorout.py:473
/home/runner/_work/kombu/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/inorout.py:473: CoverageWarning: --include is ignored because --source is set (include-ignored); see https://coverage.readthedocs.io/en/7.10.7/messages.html#warning-include-ignored
self.warn("--include is ignored because --source is set", slug="include-ignored")
.tox/3.9-unit/lib/python3.9/site-packages/google/api_core/_python_version_support.py:252
/home/runner/_work/kombu/kombu/.tox/3.9-unit/lib/python3.9/site-packages/google/api_core/_python_version_support.py:252: FutureWarning: You are using a Python version (3.9.23) past its end of life. Google will update google.api_core with critical bug fixes on a best-effort basis, but not with any other fixes or features. Please upgrade to the latest Python version, or at least Python 3.10, and then update google.api_core.
warnings.warn(message, FutureWarning)
|
Are there any updates on this? |
|
no, probably need a new taker. @MehrazRumman can you take this please? and come with a new PR on a new branch? |


Attempt to address #1021
Thank you very much for your code, it helps us a lot. @auvipy
We use redis-py instead of redis-py-cluster because redis-py-cluster has been merged into redis-py.
Celery works fine on our cluster with multi producers and multi consumers, when a node goes down, it can automatically switch.