-
Notifications
You must be signed in to change notification settings - Fork 603
feat: Add experimental async transport (port of PR #4572) #5646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4f8a00c
82c0094
4b77519
c46fb6f
5ea3aac
ff85a58
156f32b
cb932d2
d19271e
299947d
71007ec
183e83b
8883b78
86d6e36
e74f4a7
91072bb
d64517f
94b6c73
38f97c2
1ac4196
b990610
b392bc4
36ad606
025714e
ff3e9a0
9b0a712
f59c38c
94666e1
a2a9588
a8823a9
ed040a1
ad2ceae
4a2a6a3
c88848e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,7 @@ asttokens | |
| responses | ||
| pysocks | ||
| socksio | ||
| httpcore[http2] | ||
| httpcore[http2,asyncio] | ||
| setuptools | ||
| Brotli | ||
| docker | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,7 @@ | |
| from sentry_sdk.serializer import serialize | ||
| from sentry_sdk.tracing import trace | ||
| from sentry_sdk.tracing_utils import has_span_streaming_enabled | ||
| from sentry_sdk.transport import BaseHttpTransport, make_transport | ||
| from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport | ||
| from sentry_sdk.consts import ( | ||
| SPANDATA, | ||
| DEFAULT_MAX_VALUE_LENGTH, | ||
|
|
@@ -253,6 +253,12 @@ def close(self, *args: "Any", **kwargs: "Any") -> None: | |
| def flush(self, *args: "Any", **kwargs: "Any") -> None: | ||
| return None | ||
|
|
||
| async def close_async(self, *args: "Any", **kwargs: "Any") -> None: | ||
| return None | ||
|
|
||
| async def flush_async(self, *args: "Any", **kwargs: "Any") -> None: | ||
| return None | ||
|
|
||
| def __enter__(self) -> "BaseClient": | ||
| return self | ||
|
|
||
|
|
@@ -474,7 +480,7 @@ def _record_lost_event( | |
| or self.metrics_batcher | ||
| or self.span_batcher | ||
| or has_profiling_enabled(self.options) | ||
| or isinstance(self.transport, BaseHttpTransport) | ||
| or isinstance(self.transport, HttpTransportCore) | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uwsgi thread check incorrectly triggered for async transportLow Severity The guard condition was changed from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uWSGI thread check falsely triggers for async transportLow Severity The |
||
| ): | ||
| # If we have anything on that could spawn a background thread, we | ||
| # need to check if it's safe to use them. | ||
|
|
@@ -1001,6 +1007,28 @@ def get_integration( | |
|
|
||
| return self.integrations.get(integration_name) | ||
|
|
||
| def _close_components(self) -> None: | ||
| """Kill all client components in the correct order.""" | ||
| self.session_flusher.kill() | ||
| if self.log_batcher is not None: | ||
| self.log_batcher.kill() | ||
| if self.metrics_batcher is not None: | ||
| self.metrics_batcher.kill() | ||
| if self.span_batcher is not None: | ||
| self.span_batcher.kill() | ||
| if self.monitor: | ||
| self.monitor.kill() | ||
|
|
||
| def _flush_components(self) -> None: | ||
| """Flush all client components.""" | ||
| self.session_flusher.flush() | ||
| if self.log_batcher is not None: | ||
| self.log_batcher.flush() | ||
| if self.metrics_batcher is not None: | ||
| self.metrics_batcher.flush() | ||
| if self.span_batcher is not None: | ||
| self.span_batcher.flush() | ||
|
|
||
| def close( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
|
|
@@ -1011,19 +1039,43 @@ def close( | |
| semantics as :py:meth:`Client.flush`. | ||
| """ | ||
| if self.transport is not None: | ||
| if isinstance(self.transport, AsyncHttpTransport) and hasattr( | ||
| self.transport, "loop" | ||
| ): | ||
| logger.debug( | ||
| "close() used with AsyncHttpTransport, aborting. Please use close_async() instead." | ||
| ) | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return | ||
BYK marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.flush(timeout=timeout, callback=callback) | ||
| self.session_flusher.kill() | ||
| if self.log_batcher is not None: | ||
| self.log_batcher.kill() | ||
| if self.metrics_batcher is not None: | ||
| self.metrics_batcher.kill() | ||
| if self.span_batcher is not None: | ||
| self.span_batcher.kill() | ||
| if self.monitor: | ||
| self.monitor.kill() | ||
| self._close_components() | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+1048
to
+1056
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Calling the synchronous Suggested FixThe synchronous Prompt for AI Agent |
||
| self.transport.kill() | ||
| self.transport = None | ||
|
|
||
| async def close_async( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
| callback: "Optional[Callable[[int, float], None]]" = None, | ||
| ) -> None: | ||
| """ | ||
| Asynchronously close the client and shut down the transport. Arguments have the same | ||
| semantics as :py:meth:`Client.flush_async`. | ||
| """ | ||
| if self.transport is not None: | ||
| if not ( | ||
| isinstance(self.transport, AsyncHttpTransport) | ||
| and hasattr(self.transport, "loop") | ||
| ): | ||
| logger.debug( | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "close_async() used with non-async transport, aborting. Please use close() instead." | ||
| ) | ||
| return | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| await self.flush_async(timeout=timeout, callback=callback) | ||
| self._close_components() | ||
github-actions[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| kill_task = self.transport.kill() # type: ignore | ||
| if kill_task is not None: | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| await kill_task | ||
| self.transport = None | ||
|
|
||
| def flush( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
|
|
@@ -1037,17 +1089,47 @@ def flush( | |
| :param callback: Is invoked with the number of pending events and the configured timeout. | ||
| """ | ||
| if self.transport is not None: | ||
| if isinstance(self.transport, AsyncHttpTransport) and hasattr( | ||
| self.transport, "loop" | ||
| ): | ||
BYK marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| logger.debug( | ||
| "flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead." | ||
| ) | ||
| return | ||
| if timeout is None: | ||
| timeout = self.options["shutdown_timeout"] | ||
| self.session_flusher.flush() | ||
| if self.log_batcher is not None: | ||
| self.log_batcher.flush() | ||
| if self.metrics_batcher is not None: | ||
| self.metrics_batcher.flush() | ||
| if self.span_batcher is not None: | ||
| self.span_batcher.flush() | ||
| self._flush_components() | ||
|
|
||
| self.transport.flush(timeout=timeout, callback=callback) | ||
|
|
||
| async def flush_async( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
| callback: "Optional[Callable[[int, float], None]]" = None, | ||
| ) -> None: | ||
| """ | ||
| Asynchronously wait for the current events to be sent. | ||
|
|
||
| :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. | ||
|
|
||
| :param callback: Is invoked with the number of pending events and the configured timeout. | ||
| """ | ||
| if self.transport is not None: | ||
| if not ( | ||
| isinstance(self.transport, AsyncHttpTransport) | ||
| and hasattr(self.transport, "loop") | ||
| ): | ||
BYK marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| logger.debug( | ||
| "flush_async() used with non-async transport, aborting. Please use flush() instead." | ||
| ) | ||
| return | ||
| if timeout is None: | ||
| timeout = self.options["shutdown_timeout"] | ||
| self._flush_components() | ||
| flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore | ||
| if flush_task is not None: | ||
| await flush_task | ||
|
|
||
| def __enter__(self) -> "_Client": | ||
| return self | ||
|
|
||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.