Skip to content

Commit 70b9037

Browse files
committed
fix wildcard handling for protocols
Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent 32c76e9 commit 70b9037

7 files changed

Lines changed: 152 additions & 18 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
We follow [Common Changelog](https://common-changelog.org/) formatting for this document.
44

5+
## Unreleased
6+
7+
### Fixed
8+
9+
- Made support for wildcards in the channel parameter of `ControlPlaneManager.add_subscription_channel()` consistent across protocols
10+
511
## [0.9.1] - 2026-02-25
612

713
Initial reorganization of SDK packaging.

src/intersect_sdk_common/control_plane/brokers/amqp_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def __init__(
9696
username: str,
9797
password: str,
9898
topics_to_handlers: Callable[[], dict[str, TopicHandler]],
99+
find_wildcard_handler: Callable[[str], TopicHandler | None],
99100
is_root: bool,
100101
) -> None:
101102
"""The default constructor.
@@ -106,6 +107,7 @@ def __init__(
106107
username: username credentials for AMQP broker
107108
password: password credentials for AMQP broker
108109
topics_to_handlers: callback function which gets the topic to handler map from the channel manager
110+
find_wildcard_handler: callback function which gets the wildcard topic handler from the channel manager
109111
is_root: Whether or not the client can configure exchanges and queues themselves (core services), or if this must be delegated to a Core Service (SDK Clients/Services)
110112
"""
111113
self._connection_params = pika.ConnectionParameters(
@@ -131,6 +133,8 @@ def __init__(
131133

132134
# Callback to the topics_to_handler list inside of
133135
self._topics_to_handlers = topics_to_handlers
136+
self._find_wildcard_handler = find_wildcard_handler
137+
134138
# mapping of topics to callables which can unsubscribe from the topic
135139
self._topics_to_consumer_tags: dict[str, _ConsumerTagInfo] = {}
136140
self._consumer_tags_to_threads: dict[str, threading.Thread] = {}
@@ -595,6 +599,9 @@ def _consume_message(
595599

596600
tth_key = _amqp_2_hierarchy(basic_deliver.routing_key)
597601
topic_handler = self._topics_to_handlers().get(tth_key)
602+
if not topic_handler:
603+
# we may have included the topic in one of our wildcard handlers
604+
topic_handler = self._find_wildcard_handler(tth_key)
598605
if topic_handler:
599606
consumer_tag_info = self._topics_to_consumer_tags.get(basic_deliver.routing_key)
600607
if not consumer_tag_info:

src/intersect_sdk_common/control_plane/brokers/mqtt_client.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@
3333
_MQTT_MAX_RETRIES = 10
3434

3535

36+
# TODO we should be handling hierarchy parts as a list of strings until they get to the client
37+
# this will be a breaking change, so only add it when ready to break
38+
def _hierarchy_2_mqtt(hierarchy: str) -> str:
39+
"""Take the hierarchy string format saved in the Service and map it to the MQTT topic format. Currently just covers wildcards."""
40+
return hierarchy.replace('#', '+')
41+
42+
3643
class MQTTClient(BrokerClient):
3744
"""Client for performing broker actions backed by a MQTT broker.
3845
@@ -55,6 +62,7 @@ def __init__(
5562
username: str,
5663
password: str,
5764
topics_to_handlers: Callable[[], dict[str, TopicHandler]],
65+
find_wildcard_handler: Callable[[str], TopicHandler | None],
5866
uid: str | None = None,
5967
) -> None:
6068
"""The default constructor.
@@ -65,6 +73,7 @@ def __init__(
6573
username: username credentials for MQTT broker
6674
password: password credentials for MQTT broker
6775
topics_to_handlers: callback function which gets the topic to handler map from the channel manager
76+
find_wildcard_handler: callback function which gets the wildcard topic handler from the channel manager
6877
uid: A string representing the unique id to identify the client.
6978
"""
7079
# Unique id for the MQTT broker to associate this client with
@@ -89,6 +98,7 @@ def __init__(
8998

9099
# ConnectionManager callable state
91100
self._topics_to_handlers = topics_to_handlers
101+
self._find_wildcard_handler = find_wildcard_handler
92102

93103
# MQTT v3.1.1 automatically downgrades a QOS which is too high (good), but MQTT v5 will terminate the connection (bad)
94104
# see https://github.com/rabbitmq/rabbitmq-server/discussions/11842
@@ -155,8 +165,9 @@ def publish(
155165
props = Properties(PacketTypes.PUBLISH) # type: ignore[no-untyped-call]
156166
props.ContentType = content_type
157167
props.UserProperty = list(headers.items())
168+
resolved_topic = _hierarchy_2_mqtt(topic)
158169
self._connection.publish(
159-
topic, payload, qos=self._max_supported_qos if persist else 0, properties=props
170+
resolved_topic, payload, qos=self._max_supported_qos if persist else 0, properties=props
160171
)
161172

162173
def subscribe(self, topic: str, persist: bool) -> None:
@@ -166,16 +177,18 @@ def subscribe(self, topic: str, persist: bool) -> None:
166177
topic: Topic to subscribe to.
167178
persist: Determine if the associated message queue of the topic is long-lived (True) or not (False)
168179
"""
180+
resolved_topic = _hierarchy_2_mqtt(topic)
169181
# NOTE: RabbitMQ only works with QOS of 1 and 0, and seems to convert QOS2 to QOS1
170-
self._connection.subscribe(topic, qos=2 if persist else 0, properties=None)
182+
self._connection.subscribe(resolved_topic, qos=2 if persist else 0, properties=None)
171183

172184
def unsubscribe(self, topic: str) -> None:
173185
"""Unsubscribe from a topic over the pre-existing connection.
174186
175187
Args:
176188
topic: Topic to unsubscribe from.
177189
"""
178-
self._connection.unsubscribe(topic)
190+
resolved_topic = _hierarchy_2_mqtt(topic)
191+
self._connection.unsubscribe(resolved_topic)
179192

180193
def _on_message(
181194
self,
@@ -190,7 +203,11 @@ def _on_message(
190203
userdata: MQTT user data
191204
message: MQTT message
192205
"""
206+
# NOTE: should not need to convert the MQTT topic to the protocol agnostic representation, as we only change the wildcard format (which won't show up as the message topic)
193207
topic_handler = self._topics_to_handlers().get(message.topic)
208+
if not topic_handler:
209+
# we may have included the topic in one of our wildcard handlers
210+
topic_handler = self._find_wildcard_handler(message.topic)
194211
# Note that if we return prior to the callback, there will be no reply message
195212
if not topic_handler:
196213
logger.warning('Incompatible message topic %s, rejecting message', message.topic)

src/intersect_sdk_common/control_plane/control_plane_manager.py

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,30 @@
11
"""Wrapper around interacting with the control plane, which may be multiple brokers."""
22

3+
import re
34
from collections.abc import Callable
45

56
from ..config import ControlPlaneConfig
7+
from ..exceptions import IntersectSetupError
68
from ..logger import logger
79
from .brokers.broker_client import BrokerClient
810
from .definitions import MessageCallback
911
from .topic_handler import TopicHandler
1012

13+
_CHANNEL_REGEX = re.compile(r'^[a-zA-Z0-9*/-]+[a-zA-Z0-9*#/-]$')
14+
"""Allow for these patterns:
15+
16+
- alphanumeric characters
17+
- hyphens
18+
- `/` (topic separator; note that this is '.' on the broker)
19+
- `*` (wildcard for exactly one word)
20+
- `#` (wildcard for any number of words, but can ONLY appear at the end of the topic string, see section 4.7.1.2 at https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718107)
21+
"""
22+
1123

1224
def _create_control_provider(
1325
config: ControlPlaneConfig,
1426
topic_handler_callback: Callable[[], dict[str, TopicHandler]],
27+
find_wildcard_handler: Callable[[str], TopicHandler | None],
1528
) -> BrokerClient:
1629
if config.protocol == 'amqp0.9.1':
1730
from .brokers.amqp_client import ( # noqa: PLC0415 (lazy load all AMQP modules)
@@ -24,6 +37,7 @@ def _create_control_provider(
2437
username=config.username,
2538
password=config.password,
2639
topics_to_handlers=topic_handler_callback,
40+
find_wildcard_handler=find_wildcard_handler,
2741
is_root=config.is_root,
2842
)
2943

@@ -36,6 +50,7 @@ def _create_control_provider(
3650
username=config.username,
3751
password=config.password,
3852
topics_to_handlers=topic_handler_callback,
53+
find_wildcard_handler=find_wildcard_handler,
3954
)
4055

4156

@@ -53,38 +68,53 @@ def __init__(
5368
queue_name_generator should be a hardcoded value for Core Services, the SDK should provide its own function to generate queue names.
5469
"""
5570
self._control_providers = [
56-
_create_control_provider(config, self.get_subscription_channels)
71+
_create_control_provider(
72+
config, self.get_subscription_channels, self.get_wildcard_topic_handler
73+
)
5774
for config in control_configs
5875
]
5976

6077
# flag which indicates if we SHOULD be connected.
6178
self._ready = False
6279
# topics_to_handlers are managed here and transcend connections/disconnections to the broker
6380
self._topics_to_handlers: dict[str, TopicHandler] = {}
81+
self._wildcards: dict[str, TopicHandler] = {}
6482

6583
def add_subscription_channel(
6684
self, channel: str, callbacks: set[MessageCallback], persist: bool, queue_name: str
6785
) -> None:
6886
"""Start listening for messages on a channel on all configured brokers.
6987
70-
Use the format ${TOPIC}/${TOPIC}/${TOPIC} ... for the channel name, protocol implementations
71-
are responsible for converting the string to a valid channel.
88+
The provided channel value should be PROTOCOL AGNOSTIC. Use the following symbols when constructing a topic:
89+
- `/` - topic separator
90+
- `*` - exactly one word
91+
- `#` - any number of words (however, this can only be used as the LAST character)
92+
93+
Protocol implementations are responsible for converting the string to a valid channel.
7294
7395
This function should usually only be called before you've connected,
7496
but it's okay to call it after connecting. (Mostly used for Clients.)
7597
7698
Params:
77-
channel: string of the channel which we should start listening to
99+
channel: string of the channel which we should start listening to, remember to follow the above protocol agnostic convention
78100
callbacks: functions to call on subscribing to a message
79101
persist: if True, expect the associated message queue to live long; if False, it will only live the duration of the application.
80102
Any queue associated with a Service should always set this to True. Clients will need to subscribe to their own, temporary queues, and should set this to False.
81103
queue_name: the name of the queue to subscribe to. This is generally hardcoded for Core Services, but autogenerated for SDK Services and Clients.
82104
"""
83-
topic_handler = self._topics_to_handlers.get(channel)
105+
# this currently follows the MQTT/AMQP convention for wildcards
106+
# '*' = exactly one word (AMQP), '+' = exactly one word (MQTT), '#' = any number of words (AMQP and MQTT)
107+
if not _CHANNEL_REGEX.match(channel):
108+
msg = f'Channel {channel} is not valid, it must be alphanumeric and can only contain the following special characters: * # / -'
109+
raise IntersectSetupError(msg)
110+
is_wildcard = '*' in channel or channel.endswith('#')
111+
dict_to_change = self._wildcards if is_wildcard else self._topics_to_handlers
112+
113+
topic_handler = dict_to_change.get(channel)
84114
if topic_handler is None:
85-
topic_handler = TopicHandler(persist, queue_name)
115+
topic_handler = TopicHandler(persist, queue_name, channel if is_wildcard else None)
86116
topic_handler.callbacks |= callbacks
87-
self._topics_to_handlers[channel] = topic_handler
117+
dict_to_change[channel] = topic_handler
88118
else:
89119
topic_handler.callbacks |= callbacks
90120
topic_handler.topic_persist = persist
@@ -103,23 +133,38 @@ def remove_subscription_channel(self, channel: str) -> bool:
103133
try:
104134
del self._topics_to_handlers[channel]
105135
except KeyError:
106-
return False
107-
else:
108-
if self.is_connected():
109-
for provider in self._control_providers:
110-
provider.unsubscribe(channel)
111-
return True
136+
try:
137+
del self._wildcards[channel]
138+
except KeyError:
139+
return False
140+
if self.is_connected():
141+
for provider in self._control_providers:
142+
provider.unsubscribe(channel)
143+
return True
112144

113145
def get_subscription_channels(self) -> dict[str, TopicHandler]:
114146
"""Get the subscription channels.
115147
116-
Note that this function gets accessed as a callback from the direct broker implementations.
148+
These channels cannot be wildcards, and incoming topics must match the channel topics exactly. Note that this function gets accessed as a callback from the direct broker implementations.
117149
118150
Returns:
119151
the dictionary of topics to topic information
120152
"""
121153
return self._topics_to_handlers
122154

155+
def get_wildcard_topic_handler(self, topic: str) -> TopicHandler | None:
156+
"""Get the wildcard topic handler for a given topic, if it exists.
157+
158+
This is an inefficient lookup, so should only be used if we fail to find a non-wildcard match for an incoming topic. Note that this function gets accessed as a callback from the direct broker implementations.
159+
160+
Returns:
161+
the topic handler associated with the topic, if it exists; None otherwise
162+
"""
163+
for topic_handler in self._wildcards.values():
164+
if topic_handler.does_topic_match(topic):
165+
return topic_handler
166+
return None
167+
123168
def connect(self) -> None:
124169
"""Connect to all configured brokers.
125170

src/intersect_sdk_common/control_plane/topic_handler.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Attributes associated with a specific pub/sub topic."""
22

3+
import re
4+
35
from .definitions import MessageCallback
46

57

@@ -17,13 +19,45 @@ class TopicHandler:
1719
queue_name: str
1820
"""The name of the queue to subscribe to for this topic."""
1921

20-
def __init__(self, topic_persist: bool, queue_name: str) -> None:
22+
_regex_pattern: re.Pattern[str] | None = None
23+
"""If this is a wildcard topic, the regex pattern to match incoming topics against (if no wildcards, this will be None)."""
24+
25+
def __init__(
26+
self, topic_persist: bool, queue_name: str, regex_pattern: str | None = None
27+
) -> None:
2128
"""Initialize a TopicHandler instance.
2229
2330
Args:
2431
topic_persist: Whether the topic queue is expected to persist on the message broker.
2532
queue_name: The name of the queue to subscribe to for this topic.
33+
regex_pattern: If this is a wildcard topic, the regex pattern to match incoming topics against (if no wildcards, leave as None).
2634
"""
2735
self.callbacks = set()
2836
self.topic_persist = topic_persist
2937
self.queue_name = queue_name
38+
39+
if regex_pattern:
40+
regex_builder = ['^']
41+
for char in regex_pattern:
42+
if char == '*':
43+
# match a single word (do not include the separator)
44+
regex_builder.append('[a-zA-Z0-9-]+')
45+
elif char == '#':
46+
# match any sequence of 0 or more words (make sure to allow for the separator)
47+
regex_builder.append('[a-zA-Z0-9/-]*')
48+
elif char == '/':
49+
# in-memory topics use '/' as the separator, but on the broker '.' is used
50+
regex_builder.append('\\.')
51+
else:
52+
# ordinary character
53+
regex_builder.append(char)
54+
regex_builder.append('$')
55+
self._regex_pattern = re.compile(''.join(regex_builder))
56+
else:
57+
self._regex_pattern = None
58+
59+
def does_topic_match(self, topic: str) -> bool:
60+
"""Check if a given topic could be caught by the TopicHandler's topic regex. Only used for wildcard topics."""
61+
if self._regex_pattern:
62+
return bool(self._regex_pattern.match(topic))
63+
return False

src/intersect_sdk_common/exceptions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,10 @@ class IntersectApplicationError(IntersectError):
1717
1818
This exception should strictly be used for control flow - it should NEVER be a fatal exception
1919
"""
20+
21+
22+
class IntersectSetupError(IntersectError):
23+
"""Exception thrown when there is an error in setting up the configuration of a Service/Client, PRIOR to publishing or handling a message.
24+
25+
This should be considered a fatal exception.
26+
"""

tests/unit/test_topic_handler.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from intersect_sdk_common.control_plane.topic_handler import TopicHandler
2+
3+
4+
def test_topic_handler_more_complex_wildcard():
5+
# test that the topic handler correctly matches topics based on its wildcard pattern
6+
topic_handler = TopicHandler(
7+
topic_persist=False, queue_name='irrelevant', regex_pattern='a/*/c/#'
8+
)
9+
assert (
10+
topic_handler.does_topic_match('a/b/c') is not None
11+
) # * should match b, # should match nothing
12+
assert (
13+
topic_handler.does_topic_match('a/x/c/d/e') is not None
14+
) # * should match x, # should match d/e
15+
assert not topic_handler.does_topic_match(
16+
'a/b/d'
17+
) # * should match b but then c doesn't match d
18+
assert not topic_handler.does_topic_match('x/b/c') # a doesn't match x

0 commit comments

Comments
 (0)