Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ def __init__(self):
self._registered_topics: List[appcallback_v1.TopicSubscription] = []
self._registered_bindings: List[str] = []

self._wildcard_topics: List[Tuple[str, str, str, bool, TopicSubscribeCallable]] = []

def _match_topic(self, pattern: str, topic: str) -> bool:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use a different approach, this would be a second filter on top of what the pubsub already filters by matching a wildcard pattern to a concrete topic name.

More on the following comments

import re

re_pattern = re.escape(pattern)
re_pattern = re_pattern.replace(r'\+', r'[^/]*')
re_pattern = re_pattern.replace(r'\*', r'[^/.]+')
re_pattern = re_pattern.replace(r'\#', r'.*')
return bool(re.match(f'^{re_pattern}$', topic))

def register_method(self, method: str, cb: InvokeMethodCallable) -> None:
"""Registers method for service invocation."""
if method in self._invoke_method_map:
Expand All @@ -98,17 +109,16 @@ def register_topic(
disable_topic_validation: Optional[bool] = False,
) -> None:
"""Registers topic subscription for pubsub."""
if not disable_topic_validation:
topic_key = pubsub_name + DELIMITER + topic
else:
topic_key = pubsub_name
topic_key = pubsub_name + DELIMITER + topic
pubsub_topic = topic_key + DELIMITER
path = ''
if rule is not None:
path = getattr(cb, '__name__', rule.match)
pubsub_topic = pubsub_topic + path
if pubsub_topic in self._topic_map:
raise ValueError(f'{topic} is already registered with {pubsub_name}')
self._topic_map[pubsub_topic] = cb
self._wildcard_topics.append((pubsub_name, topic, path, disable_topic_validation, cb))

registered_topic = self._registered_topics_map.get(topic_key)
sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription()
Expand Down Expand Up @@ -197,14 +207,21 @@ def ListTopicSubscriptions(self, request, context):
def OnTopicEvent(self, request: TopicEventRequest, context):
"""Subscribes events from Pubsub."""
pubsub_topic = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path
no_validation_key = request.pubsub_name + DELIMITER + request.path

if pubsub_topic not in self._topic_map:
if no_validation_key in self._topic_map:
pubsub_topic = no_validation_key
else:
if pubsub_topic in self._topic_map:
cb = self._topic_map[pubsub_topic]
else:
cb = None
for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics:
if p_name == request.pubsub_name and request.path == path:
if self._match_topic(t_pattern, request.topic) or (
d_val and not any(c in t_pattern for c in ['+', '#', '*'])
):
cb = w_cb
break
Comment on lines +214 to +221

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing the regex match you had before, you can look up the cb on the route map

if cb is None:
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
raise NotImplementedError(f'topic {request.topic} is not implemented!')
raise NotImplementedError(f'topic {request.topic} topic not implemented!')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a typo 😅


customdata: Struct = request.extensions
extensions = dict()
Expand All @@ -222,7 +239,7 @@ def OnTopicEvent(self, request: TopicEventRequest, context):
event.SetSubject(request.topic)
event.SetExtensions(extensions)

response = self._topic_map[pubsub_topic](event)
response = cb(event)
if isinstance(response, TopicEventResponse):
return appcallback_v1.TopicEventResponse(status=response.status.value)
return empty_pb2.Empty()
Expand Down Expand Up @@ -293,13 +310,20 @@ def _handle_bulk_topic_event(
) -> Optional[TopicEventBulkResponse]:
"""Process bulk topic event request - routes each entry to the appropriate topic handler."""
topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path
no_validation_key = request.pubsub_name + DELIMITER + request.path

if topic_key not in self._topic_map and no_validation_key not in self._topic_map:
return None # we don't have a handler

handler_key = topic_key if topic_key in self._topic_map else no_validation_key
cb = self._topic_map[handler_key] # callback
if topic_key in self._topic_map:
cb = self._topic_map[topic_key] # callback
else:
cb = None
for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics:
if p_name == request.pubsub_name and request.path == path:
if self._match_topic(t_pattern, request.topic) or (
d_val and not any(c in t_pattern for c in ['+', '#', '*'])
):
cb = w_cb
break
if cb is None:
return None # we don't have a handler
Comment on lines +317 to +326

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Logic looks pretty similar and could be moved to a helper method too


statuses = []
for entry in request.entries:
Expand Down
32 changes: 32 additions & 0 deletions ext/dapr-ext-grpc/tests/test_servicier.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,38 @@ def test_non_registered_topic(self):
self.fake_context,
)

def test_multiple_wildcard_subscriptions(self):
self._servicer.register_topic(
'pubsub_multi_wildcard',
'orders/+/items',
self._topic1_method,
None,
disable_topic_validation=True,
)
self._servicer.register_topic(
'pubsub_multi_wildcard',
'inventory/#',
self._topic2_method,
None,
disable_topic_validation=True,
)

self._servicer.OnTopicEvent(
appcallback_v1.TopicEventRequest(
pubsub_name='pubsub_multi_wildcard', topic='orders/123/items'
),
self.fake_context,
)
self._topic1_method.assert_called_once()

self._servicer.OnTopicEvent(
appcallback_v1.TopicEventRequest(
pubsub_name='pubsub_multi_wildcard', topic='inventory/warehouse/aisle4'
),
self.fake_context,
)
self._topic2_method.assert_called_once()


class BulkTopicEventTests(unittest.TestCase):
def setUp(self):
Expand Down