Skip to content

Commit 223946d

Browse files
committed
refactor(grpc): address review feedback by optimizing routing logic
Signed-off-by: Rishabh Dewangan <107680241+Rishabh-git10@users.noreply.github.com>
1 parent 9acfeff commit 223946d

2 files changed

Lines changed: 31 additions & 43 deletions

File tree

ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,24 @@ def __init__(self):
8181
self._registered_topics: List[appcallback_v1.TopicSubscription] = []
8282
self._registered_bindings: List[str] = []
8383

84-
self._wildcard_topics: List[Tuple[str, str, str, bool, TopicSubscribeCallable]] = []
84+
self._route_map: Dict[Tuple[str, str], TopicSubscribeCallable] = {}
85+
self._validation_disabled_pubsubs: Dict[str, TopicSubscribeCallable] = {}
8586

86-
def _match_topic(self, pattern: str, topic: str) -> bool:
87-
import re
87+
def _get_topic_callback(
88+
self, pubsub_name: str, topic: str, path: str
89+
) -> Optional[TopicSubscribeCallable]:
90+
pubsub_topic = pubsub_name + DELIMITER + topic + DELIMITER + path
91+
if pubsub_topic in self._topic_map:
92+
return self._topic_map[pubsub_topic]
93+
94+
if (pubsub_name, path) in self._route_map:
95+
return self._route_map[(pubsub_name, path)]
96+
97+
if path == '':
98+
if pubsub_name in self._validation_disabled_pubsubs:
99+
return self._validation_disabled_pubsubs[pubsub_name]
88100

89-
re_pattern = re.escape(pattern)
90-
re_pattern = re_pattern.replace(r'\+', r'[^/]*')
91-
re_pattern = re_pattern.replace(r'\*', r'[^/.]+')
92-
re_pattern = re_pattern.replace(r'\#', r'.*')
93-
return bool(re.match(f'^{re_pattern}$', topic))
101+
return None
94102

95103
def register_method(self, method: str, cb: InvokeMethodCallable) -> None:
96104
"""Registers method for service invocation."""
@@ -111,14 +119,16 @@ def register_topic(
111119
"""Registers topic subscription for pubsub."""
112120
topic_key = pubsub_name + DELIMITER + topic
113121
pubsub_topic = topic_key + DELIMITER
114-
path = ''
115122
if rule is not None:
116123
path = getattr(cb, '__name__', rule.match)
117124
pubsub_topic = pubsub_topic + path
118125
if pubsub_topic in self._topic_map:
119126
raise ValueError(f'{topic} is already registered with {pubsub_name}')
120127
self._topic_map[pubsub_topic] = cb
121-
self._wildcard_topics.append((pubsub_name, topic, path, disable_topic_validation, cb))
128+
self._route_map[(pubsub_name, topic)] = cb
129+
130+
if disable_topic_validation:
131+
self._validation_disabled_pubsubs[pubsub_name] = cb
122132

123133
registered_topic = self._registered_topics_map.get(topic_key)
124134
sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription()
@@ -206,22 +216,10 @@ def ListTopicSubscriptions(self, request, context):
206216

207217
def OnTopicEvent(self, request: TopicEventRequest, context):
208218
"""Subscribes events from Pubsub."""
209-
pubsub_topic = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path
210-
211-
if pubsub_topic in self._topic_map:
212-
cb = self._topic_map[pubsub_topic]
213-
else:
214-
cb = None
215-
for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics:
216-
if p_name == request.pubsub_name and request.path == path:
217-
if self._match_topic(t_pattern, request.topic) or (
218-
d_val and not any(c in t_pattern for c in ['+', '#', '*'])
219-
):
220-
cb = w_cb
221-
break
222-
if cb is None:
223-
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
224-
raise NotImplementedError(f'topic {request.topic} topic not implemented!')
219+
cb = self._get_topic_callback(request.pubsub_name, request.topic, request.path)
220+
if cb is None:
221+
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
222+
raise NotImplementedError(f'topic {request.topic} is not implemented!')
225223

226224
customdata: Struct = request.extensions
227225
extensions = dict()
@@ -309,21 +307,9 @@ def _handle_bulk_topic_event(
309307
self, request: TopicEventBulkRequest, context
310308
) -> Optional[TopicEventBulkResponse]:
311309
"""Process bulk topic event request - routes each entry to the appropriate topic handler."""
312-
topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path
313-
314-
if topic_key in self._topic_map:
315-
cb = self._topic_map[topic_key] # callback
316-
else:
317-
cb = None
318-
for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics:
319-
if p_name == request.pubsub_name and request.path == path:
320-
if self._match_topic(t_pattern, request.topic) or (
321-
d_val and not any(c in t_pattern for c in ['+', '#', '*'])
322-
):
323-
cb = w_cb
324-
break
325-
if cb is None:
326-
return None # we don't have a handler
310+
cb = self._get_topic_callback(request.pubsub_name, request.topic, request.path)
311+
if cb is None:
312+
return None # we don't have a handler
327313

328314
statuses = []
329315
for entry in request.entries:

ext/dapr-ext-grpc/tests/test_servicier.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,15 +200,17 @@ def test_multiple_wildcard_subscriptions(self):
200200

201201
self._servicer.OnTopicEvent(
202202
appcallback_v1.TopicEventRequest(
203-
pubsub_name='pubsub_multi_wildcard', topic='orders/123/items'
203+
pubsub_name='pubsub_multi_wildcard', topic='orders/123/items', path='orders/+/items'
204204
),
205205
self.fake_context,
206206
)
207207
self._topic1_method.assert_called_once()
208208

209209
self._servicer.OnTopicEvent(
210210
appcallback_v1.TopicEventRequest(
211-
pubsub_name='pubsub_multi_wildcard', topic='inventory/warehouse/aisle4'
211+
pubsub_name='pubsub_multi_wildcard',
212+
topic='inventory/warehouse/aisle4',
213+
path='inventory/#',
212214
),
213215
self.fake_context,
214216
)

0 commit comments

Comments
 (0)