-
Notifications
You must be signed in to change notification settings - Fork 146
fix(grpc): allow multiple wildcard subscriptions when topic validation disabled #1083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,17 @@ def __init__(self): | |
| self._registered_topics: List[appcallback_v1.TopicSubscription] = [] | ||
| self._registered_bindings: List[str] = [] | ||
|
|
||
| self._wildcard_topics: List[Tuple[str, str, str, bool, TopicSubscribeCallable]] = [] | ||
|
|
||
| def _match_topic(self, pattern: str, topic: str) -> bool: | ||
| import re | ||
|
|
||
| re_pattern = re.escape(pattern) | ||
| re_pattern = re_pattern.replace(r'\+', r'[^/]*') | ||
| re_pattern = re_pattern.replace(r'\*', r'[^/.]+') | ||
| re_pattern = re_pattern.replace(r'\#', r'.*') | ||
| return bool(re.match(f'^{re_pattern}$', topic)) | ||
|
|
||
| def register_method(self, method: str, cb: InvokeMethodCallable) -> None: | ||
| """Registers method for service invocation.""" | ||
| if method in self._invoke_method_map: | ||
|
|
@@ -98,17 +109,16 @@ def register_topic( | |
| disable_topic_validation: Optional[bool] = False, | ||
| ) -> None: | ||
| """Registers topic subscription for pubsub.""" | ||
| if not disable_topic_validation: | ||
| topic_key = pubsub_name + DELIMITER + topic | ||
| else: | ||
| topic_key = pubsub_name | ||
| topic_key = pubsub_name + DELIMITER + topic | ||
| pubsub_topic = topic_key + DELIMITER | ||
| path = '' | ||
| if rule is not None: | ||
| path = getattr(cb, '__name__', rule.match) | ||
| pubsub_topic = pubsub_topic + path | ||
| if pubsub_topic in self._topic_map: | ||
| raise ValueError(f'{topic} is already registered with {pubsub_name}') | ||
| self._topic_map[pubsub_topic] = cb | ||
| self._wildcard_topics.append((pubsub_name, topic, path, disable_topic_validation, cb)) | ||
|
|
||
| registered_topic = self._registered_topics_map.get(topic_key) | ||
| sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription() | ||
|
|
@@ -197,14 +207,21 @@ def ListTopicSubscriptions(self, request, context): | |
| def OnTopicEvent(self, request: TopicEventRequest, context): | ||
| """Subscribes events from Pubsub.""" | ||
| pubsub_topic = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path | ||
| no_validation_key = request.pubsub_name + DELIMITER + request.path | ||
|
|
||
| if pubsub_topic not in self._topic_map: | ||
| if no_validation_key in self._topic_map: | ||
| pubsub_topic = no_validation_key | ||
| else: | ||
| if pubsub_topic in self._topic_map: | ||
| cb = self._topic_map[pubsub_topic] | ||
| else: | ||
| cb = None | ||
| for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics: | ||
| if p_name == request.pubsub_name and request.path == path: | ||
| if self._match_topic(t_pattern, request.topic) or ( | ||
| d_val and not any(c in t_pattern for c in ['+', '#', '*']) | ||
| ): | ||
| cb = w_cb | ||
| break | ||
|
Comment on lines
+214
to
+221
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of doing the regex match you had before, you can look up the cb on the route map |
||
| if cb is None: | ||
| context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore | ||
| raise NotImplementedError(f'topic {request.topic} is not implemented!') | ||
| raise NotImplementedError(f'topic {request.topic} topic not implemented!') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this a typo 😅 |
||
|
|
||
| customdata: Struct = request.extensions | ||
| extensions = dict() | ||
|
|
@@ -222,7 +239,7 @@ def OnTopicEvent(self, request: TopicEventRequest, context): | |
| event.SetSubject(request.topic) | ||
| event.SetExtensions(extensions) | ||
|
|
||
| response = self._topic_map[pubsub_topic](event) | ||
| response = cb(event) | ||
| if isinstance(response, TopicEventResponse): | ||
| return appcallback_v1.TopicEventResponse(status=response.status.value) | ||
| return empty_pb2.Empty() | ||
|
|
@@ -293,13 +310,20 @@ def _handle_bulk_topic_event( | |
| ) -> Optional[TopicEventBulkResponse]: | ||
| """Process bulk topic event request - routes each entry to the appropriate topic handler.""" | ||
| topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path | ||
| no_validation_key = request.pubsub_name + DELIMITER + request.path | ||
|
|
||
| if topic_key not in self._topic_map and no_validation_key not in self._topic_map: | ||
| return None # we don't have a handler | ||
|
|
||
| handler_key = topic_key if topic_key in self._topic_map else no_validation_key | ||
| cb = self._topic_map[handler_key] # callback | ||
| if topic_key in self._topic_map: | ||
| cb = self._topic_map[topic_key] # callback | ||
| else: | ||
| cb = None | ||
| for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics: | ||
| if p_name == request.pubsub_name and request.path == path: | ||
| if self._match_topic(t_pattern, request.topic) or ( | ||
| d_val and not any(c in t_pattern for c in ['+', '#', '*']) | ||
| ): | ||
| cb = w_cb | ||
| break | ||
| if cb is None: | ||
| return None # we don't have a handler | ||
|
Comment on lines
+317
to
+326
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. Logic looks pretty similar and could be moved to a helper method too |
||
|
|
||
| statuses = [] | ||
| for entry in request.entries: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use a different approach, this would be a second filter on top of what the pubsub already filters by matching a wildcard pattern to a concrete topic name.
More on the following comments