Skip to content

Commit 9acfeff

Browse files
committed
fix(grpc): allow multiple wildcard subscriptions when topic validation disabled
Signed-off-by: Rishabh Dewangan <107680241+Rishabh-git10@users.noreply.github.com>
1 parent 51d6e93 commit 9acfeff

2 files changed

Lines changed: 73 additions & 17 deletions

File tree

ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ def __init__(self):
8181
self._registered_topics: List[appcallback_v1.TopicSubscription] = []
8282
self._registered_bindings: List[str] = []
8383

84+
self._wildcard_topics: List[Tuple[str, str, str, bool, TopicSubscribeCallable]] = []
85+
86+
def _match_topic(self, pattern: str, topic: str) -> bool:
87+
import re
88+
89+
re_pattern = re.escape(pattern)
90+
re_pattern = re_pattern.replace(r'\+', r'[^/]*')
91+
re_pattern = re_pattern.replace(r'\*', r'[^/.]+')
92+
re_pattern = re_pattern.replace(r'\#', r'.*')
93+
return bool(re.match(f'^{re_pattern}$', topic))
94+
8495
def register_method(self, method: str, cb: InvokeMethodCallable) -> None:
8596
"""Registers method for service invocation."""
8697
if method in self._invoke_method_map:
@@ -98,17 +109,16 @@ def register_topic(
98109
disable_topic_validation: Optional[bool] = False,
99110
) -> None:
100111
"""Registers topic subscription for pubsub."""
101-
if not disable_topic_validation:
102-
topic_key = pubsub_name + DELIMITER + topic
103-
else:
104-
topic_key = pubsub_name
112+
topic_key = pubsub_name + DELIMITER + topic
105113
pubsub_topic = topic_key + DELIMITER
114+
path = ''
106115
if rule is not None:
107116
path = getattr(cb, '__name__', rule.match)
108117
pubsub_topic = pubsub_topic + path
109118
if pubsub_topic in self._topic_map:
110119
raise ValueError(f'{topic} is already registered with {pubsub_name}')
111120
self._topic_map[pubsub_topic] = cb
121+
self._wildcard_topics.append((pubsub_name, topic, path, disable_topic_validation, cb))
112122

113123
registered_topic = self._registered_topics_map.get(topic_key)
114124
sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription()
@@ -197,14 +207,21 @@ def ListTopicSubscriptions(self, request, context):
197207
def OnTopicEvent(self, request: TopicEventRequest, context):
198208
"""Subscribes events from Pubsub."""
199209
pubsub_topic = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path
200-
no_validation_key = request.pubsub_name + DELIMITER + request.path
201210

202-
if pubsub_topic not in self._topic_map:
203-
if no_validation_key in self._topic_map:
204-
pubsub_topic = no_validation_key
205-
else:
211+
if pubsub_topic in self._topic_map:
212+
cb = self._topic_map[pubsub_topic]
213+
else:
214+
cb = None
215+
for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics:
216+
if p_name == request.pubsub_name and request.path == path:
217+
if self._match_topic(t_pattern, request.topic) or (
218+
d_val and not any(c in t_pattern for c in ['+', '#', '*'])
219+
):
220+
cb = w_cb
221+
break
222+
if cb is None:
206223
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
207-
raise NotImplementedError(f'topic {request.topic} is not implemented!')
224+
raise NotImplementedError(f'topic {request.topic} topic not implemented!')
208225

209226
customdata: Struct = request.extensions
210227
extensions = dict()
@@ -222,7 +239,7 @@ def OnTopicEvent(self, request: TopicEventRequest, context):
222239
event.SetSubject(request.topic)
223240
event.SetExtensions(extensions)
224241

225-
response = self._topic_map[pubsub_topic](event)
242+
response = cb(event)
226243
if isinstance(response, TopicEventResponse):
227244
return appcallback_v1.TopicEventResponse(status=response.status.value)
228245
return empty_pb2.Empty()
@@ -293,13 +310,20 @@ def _handle_bulk_topic_event(
293310
) -> Optional[TopicEventBulkResponse]:
294311
"""Process bulk topic event request - routes each entry to the appropriate topic handler."""
295312
topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path
296-
no_validation_key = request.pubsub_name + DELIMITER + request.path
297313

298-
if topic_key not in self._topic_map and no_validation_key not in self._topic_map:
299-
return None # we don't have a handler
300-
301-
handler_key = topic_key if topic_key in self._topic_map else no_validation_key
302-
cb = self._topic_map[handler_key] # callback
314+
if topic_key in self._topic_map:
315+
cb = self._topic_map[topic_key] # callback
316+
else:
317+
cb = None
318+
for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics:
319+
if p_name == request.pubsub_name and request.path == path:
320+
if self._match_topic(t_pattern, request.topic) or (
321+
d_val and not any(c in t_pattern for c in ['+', '#', '*'])
322+
):
323+
cb = w_cb
324+
break
325+
if cb is None:
326+
return None # we don't have a handler
303327

304328
statuses = []
305329
for entry in request.entries:

ext/dapr-ext-grpc/tests/test_servicier.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,38 @@ def test_non_registered_topic(self):
182182
self.fake_context,
183183
)
184184

185+
def test_multiple_wildcard_subscriptions(self):
186+
self._servicer.register_topic(
187+
'pubsub_multi_wildcard',
188+
'orders/+/items',
189+
self._topic1_method,
190+
None,
191+
disable_topic_validation=True,
192+
)
193+
self._servicer.register_topic(
194+
'pubsub_multi_wildcard',
195+
'inventory/#',
196+
self._topic2_method,
197+
None,
198+
disable_topic_validation=True,
199+
)
200+
201+
self._servicer.OnTopicEvent(
202+
appcallback_v1.TopicEventRequest(
203+
pubsub_name='pubsub_multi_wildcard', topic='orders/123/items'
204+
),
205+
self.fake_context,
206+
)
207+
self._topic1_method.assert_called_once()
208+
209+
self._servicer.OnTopicEvent(
210+
appcallback_v1.TopicEventRequest(
211+
pubsub_name='pubsub_multi_wildcard', topic='inventory/warehouse/aisle4'
212+
),
213+
self.fake_context,
214+
)
215+
self._topic2_method.assert_called_once()
216+
185217

186218
class BulkTopicEventTests(unittest.TestCase):
187219
def setUp(self):

0 commit comments

Comments
 (0)