Skip to content

Commit bd69137

Browse files
authored
feat: Redis connector (#213)
# Summary Adds a Redis backed Connector implementation. # Changes - Adds `RedisChannel` and `RedisConnector` for async messaging via Redis. - Updates tests to cover Redis implementations. - Adds an optional `redis` dependency group.
1 parent 4b1efe2 commit bd69137

16 files changed

Lines changed: 1650 additions & 1298 deletions

.github/workflows/lint-test.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Lint and test
33
on:
44
push:
55
branches:
6-
- 'main'
6+
- "main"
77
pull_request:
88
types:
99
- opened
@@ -137,13 +137,14 @@ jobs:
137137

138138
- name: Run backing services
139139
run: |
140-
docker compose up postgres rabbitmq -d
140+
docker compose up -d
141141
sleep 10 # Wait for services to start
142142
143143
- name: Run integration tests
144144
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.integration uv run coverage run -m pytest ./tests/integration/ -m "not tuner"
145145
env:
146146
RABBITMQ_URL: amqp://user:password@localhost:5672/
147+
REDIS_URL: redis://default:password@localhost:6379/
147148
RAY_ENABLE_UV_RUN_RUNTIME_ENV: 0
148149
PLUGBOARD_IO_READ_TIMEOUT: 5.0
149150

@@ -153,7 +154,7 @@ jobs:
153154
name: coverage-integration-py${{ matrix.python_version }}
154155
include-hidden-files: true
155156
path: .coverage.py${{ matrix.python_version }}.integration*
156-
157+
157158
test-integration-tuner:
158159
name: Tests - integration:tuner
159160
runs-on: ubuntu-latest
@@ -183,6 +184,7 @@ jobs:
183184
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.integration.tuner uv run coverage run -m pytest ./tests/integration/ -m "tuner"
184185
env:
185186
RABBITMQ_URL: amqp://user:password@localhost:5672/
187+
REDIS_URL: redis://default:password@localhost:6379/
186188
RAY_ENABLE_UV_RUN_RUNTIME_ENV: 0
187189
PLUGBOARD_IO_READ_TIMEOUT: 5.0
188190

compose.yaml

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: plugboard
22

33
services:
44
rabbitmq:
5-
image: rabbitmq:4.0-rc-management-alpine
5+
image: rabbitmq:4.2-management-alpine
66
container_name: rabbitmq
77
ports:
88
- 5672:5672
@@ -17,7 +17,7 @@ services:
1717
restart: always
1818

1919
postgres:
20-
image: postgres:18-alpine
20+
image: postgres:18.2-alpine
2121
container_name: postgres
2222
environment:
2323
- POSTGRES_USER=plugboard
@@ -31,14 +31,16 @@ services:
3131
- main
3232
restart: always
3333

34-
valkey:
35-
image: valkey/valkey:8.0-alpine
36-
container_name: valkey
37-
command: valkey-server --dir /var/lib/valkey --bind 0.0.0.0 -::1 --protected-mode no
34+
redis:
35+
image: redis:8.6-alpine
36+
container_name: redis
37+
environment:
38+
- REDIS_PASSWORD=password
39+
command: redis-server --bind 0.0.0.0 --requirepass password
3840
ports:
3941
- 6379:6379
4042
volumes:
41-
- valkey-data:/var/lib/valkey
43+
- redis-data:/data
4244
networks:
4345
- main
4446
restart: always
@@ -50,4 +52,4 @@ networks:
5052
volumes:
5153
rabbitmq-data:
5254
postgres-data:
53-
valkey-data:
55+
redis-data:

docs/api/connector/connector.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,7 @@
1111
- RabbitMQChannel
1212
- RayConnector
1313
- RayChannel
14+
- RedisConnector
15+
- RedisChannel
1416
- ZMQConnector
1517
- ZMQChannel

docs/usage/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Plugboard can make use of a message broker for data exchange between components
2222
| Option Name | Description | Default Value |
2323
|----------------------------|----------------------------------------------------------|---------------|
2424
| `RABBITMQ_URL` | URL for RabbitMQ AMQP message broker (must include credentials if required) | |
25+
| `REDIS_URL` | URL for Redis message broker (must include credentials if required) | |
2526

2627
## Job ID
2728

plugboard/connector/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from plugboard.connector.connector_builder import ConnectorBuilder
77
from plugboard.connector.rabbitmq_channel import RabbitMQChannel, RabbitMQConnector
88
from plugboard.connector.ray_channel import RayChannel, RayConnector
9+
from plugboard.connector.redis_channel import RedisChannel, RedisConnector
910
from plugboard.connector.serde_channel import SerdeChannel
1011
from plugboard.connector.zmq_channel import ZMQChannel, ZMQConnector
1112

@@ -20,6 +21,8 @@
2021
"RabbitMQConnector",
2122
"RayChannel",
2223
"RayConnector",
24+
"RedisChannel",
25+
"RedisConnector",
2326
"SerdeChannel",
2427
"ZMQChannel",
2528
"ZMQConnector",

plugboard/connector/rabbitmq_channel.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,14 @@ def __init__(self, *args: _t.Any, **kwargs: _t.Any) -> None:
118118
self._recv_channel: _t.Optional[RabbitMQChannel] = None
119119
self._recv_channel_lock = asyncio.Lock()
120120

121-
def __getstate__(self) -> dict:
121+
def __getstate__(self) -> dict: # pragma: no cover
122122
state = self.__dict__.copy()
123123
for attr in ("_send_channel", "_send_channel_lock", "_recv_channel", "_recv_channel_lock"):
124124
if attr in state:
125125
del state[attr]
126126
return state
127127

128-
def __setstate__(self, state: dict) -> None:
128+
def __setstate__(self, state: dict) -> None: # pragma: no cover
129129
self.__dict__.update(state)
130130
self._send_channel = None
131131
self._send_channel_lock = asyncio.Lock()
@@ -134,9 +134,13 @@ def __setstate__(self, state: dict) -> None:
134134

135135
@inject
136136
async def connect_send(
137-
self, rabbitmq_conn: AbstractRobustConnection = Provide[DI.rabbitmq_conn]
137+
self, rabbitmq_conn: AbstractRobustConnection | None = Provide[DI.rabbitmq_conn]
138138
) -> RabbitMQChannel:
139139
"""Returns a `RabbitMQ` channel for sending messages."""
140+
if rabbitmq_conn is None:
141+
raise RuntimeError(
142+
"RabbitMQ connection not available. Ensure RabbitMQ URL is configured."
143+
)
140144
async with self._send_channel_lock:
141145
if self._send_channel is not None:
142146
return self._send_channel
@@ -150,9 +154,13 @@ async def connect_send(
150154

151155
@inject
152156
async def connect_recv(
153-
self, rabbitmq_conn: AbstractRobustConnection = Provide[DI.rabbitmq_conn]
157+
self, rabbitmq_conn: AbstractRobustConnection | None = Provide[DI.rabbitmq_conn]
154158
) -> RabbitMQChannel:
155159
"""Returns a `RabbitMQ` channel for receiving messages."""
160+
if rabbitmq_conn is None:
161+
raise RuntimeError(
162+
"RabbitMQ connection not available. Ensure RabbitMQ URL is configured."
163+
)
156164
cm = self._recv_channel_lock if self.spec.mode != ConnectorMode.PUBSUB else nullcontext()
157165
async with cm:
158166
if self._recv_channel is not None:
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
"""Provides RedisChannel and RedisConnector."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import typing as _t
7+
8+
from plugboard_schemas.connector import ConnectorMode
9+
from that_depends import Provide, inject
10+
11+
from plugboard.connector.connector import Connector
12+
from plugboard.connector.serde_channel import SerdeChannel
13+
from plugboard.exceptions import ChannelClosedError
14+
from plugboard.utils import DI, depends_on_optional
15+
16+
17+
try:
18+
from redis.asyncio import Redis
19+
from redis.asyncio.client import PubSub
20+
except ImportError: # pragma: no cover
21+
pass
22+
23+
24+
class RedisChannel(SerdeChannel):
25+
"""`RedisChannel` for sending and receiving messages via Redis."""
26+
27+
@depends_on_optional("redis")
28+
def __init__(
29+
self,
30+
*args: _t.Any,
31+
key: str,
32+
send_fn: _t.Optional[_t.Callable[[bytes], _t.Awaitable[None]]] = None,
33+
recv_fn: _t.Optional[_t.Callable[[], _t.Awaitable[bytes]]] = None,
34+
pubsub: _t.Optional[PubSub] = None,
35+
**kwargs: _t.Any,
36+
) -> None:
37+
"""Instantiates a `RedisChannel`.
38+
39+
Uses Redis to provide communication between components on different processes.
40+
Requires a Redis server to be running with the URL set in the `REDIS_URL`
41+
environment variable.
42+
43+
Args:
44+
key: The Redis key for the channel.
45+
send_fn: Optional; A callable for sending messages to the Redis channel.
46+
recv_fn: Optional; A callable for receiving messages from the Redis channel.
47+
pubsub: Optional; The Redis `PubSub` instance, used in pub-sub mode.
48+
"""
49+
super().__init__(*args, **kwargs)
50+
self._key = key
51+
self._send_fn = send_fn
52+
self._recv_fn = recv_fn
53+
self._pubsub = pubsub
54+
55+
# Set initial state based on intended usage
56+
self._is_send_closed = send_fn is None
57+
self._is_recv_closed = recv_fn is None
58+
59+
async def send(self, msg: bytes) -> None:
60+
"""Send a message to the Redis channel."""
61+
if self._is_send_closed or self._send_fn is None:
62+
raise ChannelClosedError("Channel is closed for sending")
63+
await self._send_fn(msg)
64+
65+
async def recv(self) -> bytes:
66+
"""Receive a message from the Redis channel."""
67+
if self._is_recv_closed or self._recv_fn is None:
68+
raise ChannelClosedError("Channel is closed for receiving")
69+
return await self._recv_fn()
70+
71+
async def close(self) -> None:
72+
"""Closes the `RedisChannel`."""
73+
# If we are a sender, send the close message (via super().close())
74+
if not self._is_send_closed:
75+
await super().close()
76+
self._is_send_closed = True
77+
78+
if self._pubsub is not None:
79+
await self._pubsub.unsubscribe()
80+
await self._pubsub.close()
81+
self._pubsub = None
82+
83+
self._is_recv_closed = True
84+
85+
86+
class RedisConnector(Connector):
87+
"""`RedisConnector` connects components via Redis."""
88+
89+
@depends_on_optional("redis")
90+
def __init__(self, *args: _t.Any, **kwargs: _t.Any) -> None:
91+
"""Instantiates a `RedisConnector`.
92+
93+
Uses Redis to connect components via either pipeline (list-based) or pub-sub
94+
(channel-based) mode. Requires a Redis server to be running with the URL set
95+
in the `REDIS_URL` environment variable.
96+
"""
97+
super().__init__(*args, **kwargs)
98+
self._topic: str = (
99+
str(self.spec.source) if self.spec.mode == ConnectorMode.PUBSUB else self.spec.id
100+
)
101+
self._send_channel: _t.Optional[RedisChannel] = None
102+
self._send_channel_lock = asyncio.Lock()
103+
self._recv_channel: _t.Optional[RedisChannel] = None
104+
self._recv_channel_lock = asyncio.Lock()
105+
106+
def __getstate__(self) -> dict: # pragma: no cover
107+
state = self.__dict__.copy()
108+
for attr in ("_send_channel", "_recv_channel", "_send_channel_lock", "_recv_channel_lock"):
109+
if attr in state:
110+
del state[attr]
111+
return state
112+
113+
def __setstate__(self, state: dict) -> None: # pragma: no cover
114+
self.__dict__.update(state)
115+
self._send_channel = None
116+
self._send_channel_lock = asyncio.Lock()
117+
self._recv_channel = None
118+
self._recv_channel_lock = asyncio.Lock()
119+
120+
@inject
121+
async def _get_key(self, job_id: str = Provide[DI.job_id]) -> str:
122+
return f"{job_id}.{self._topic}"
123+
124+
@inject
125+
async def connect_send(
126+
self, redis_client: Redis | None = Provide[DI.redis_client]
127+
) -> RedisChannel:
128+
"""Returns a `RedisChannel` for sending messages."""
129+
if redis_client is None:
130+
raise RuntimeError("Redis client not available. Ensure Redis URL is configured.")
131+
async with self._send_channel_lock:
132+
if self._send_channel is not None:
133+
return self._send_channel
134+
135+
key = await self._get_key()
136+
send_fn = self._build_send_fn(redis_client, key)
137+
self._send_channel = RedisChannel(key=key, send_fn=send_fn)
138+
return self._send_channel
139+
140+
def _build_send_fn(
141+
self, redis_client: Redis, key: str
142+
) -> _t.Callable[[bytes], _t.Awaitable[None]]:
143+
if self.spec.mode == ConnectorMode.PIPELINE:
144+
145+
async def send_fn(msg: bytes) -> None:
146+
await redis_client.lpush(key, msg) # type: ignore[misc]
147+
else:
148+
149+
async def send_fn(msg: bytes) -> None:
150+
await redis_client.publish(key, msg)
151+
152+
return send_fn
153+
154+
@inject
155+
async def connect_recv(
156+
self, redis_client: Redis | None = Provide[DI.redis_client]
157+
) -> RedisChannel:
158+
"""Returns a `RedisChannel` for receiving messages."""
159+
if redis_client is None:
160+
raise RuntimeError("Redis client not available. Ensure Redis URL is configured.")
161+
key = await self._get_key()
162+
if self.spec.mode == ConnectorMode.PIPELINE:
163+
async with self._recv_channel_lock:
164+
if self._recv_channel is not None:
165+
return self._recv_channel
166+
recv_fn = self._build_recv_fn(redis_client, key)
167+
channel = RedisChannel(key=key, recv_fn=recv_fn)
168+
self._recv_channel = channel
169+
else: # ConnectorMode.PUBSUB
170+
pubsub = redis_client.pubsub(ignore_subscribe_messages=True)
171+
await pubsub.subscribe(key)
172+
recv_fn = self._build_recv_fn(redis_client, key, pubsub=pubsub)
173+
channel = RedisChannel(key=key, recv_fn=recv_fn, pubsub=pubsub)
174+
return channel
175+
176+
def _build_recv_fn(
177+
self, redis_client: Redis, key: str, pubsub: _t.Optional[PubSub] = None
178+
) -> _t.Callable[[], _t.Awaitable[bytes]]:
179+
if self.spec.mode == ConnectorMode.PIPELINE:
180+
181+
async def recv_fn() -> bytes:
182+
result = await redis_client.brpop([key], timeout=None) # type: ignore[misc]
183+
return result[1]
184+
else:
185+
if pubsub is None:
186+
raise ValueError("PubSub instance required for PUBSUB mode")
187+
188+
async def recv_fn() -> bytes:
189+
# NOTE : We use `listen()` here due to non-sensical `get_message()` behaviour with
190+
# : `ignore_subscribe_messages=True`.
191+
# : See: https://github.com/redis/redis-py/issues/733#issuecomment-1956647495
192+
message = await asyncio.wait_for(anext(pubsub.listen()), timeout=None)
193+
return message["data"]
194+
195+
return recv_fn

0 commit comments

Comments
 (0)