Skip to content

Commit 5573478

Browse files
authored
fix: runtime issue in notebooks when using python 3.14 (DM-3678) (#2560)
1 parent 6c915a9 commit 5573478

14 files changed

Lines changed: 891 additions & 111 deletions

File tree

CHANGELOG.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ As of 2025-08-29, changes are grouped as follows
9999
- **global_config**: New setting `follow_redirects` that controls whether or not to follow redirects. Defaults to `False`.
100100
- **global_config**: New setting `file_download_chunk_size` that allows you to override the chunk size for streaming file downloads. Defaults to `None` (auto).
101101
- **global_config**: New setting `file_upload_chunk_size` that allows you to override the chunk size for streaming file uploads.
102-
- **global_config**: New setting `event_loop`, allowing you to override the default event loop used by the SDK.
103102
- **global_config**: `proxies` have been replaced by `proxy` and follow httpx directly. See: [Proxies - HTTPX](https://www.python-httpx.org/advanced/proxies/)
104103
- **global_config**: `max_retry_backoff` default has been increased from 30 sec to 60 sec.
105104
- **global_config**: `max_connection_pool_size` default has been reduced from 50 to 20.

MIGRATION_GUIDE.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,12 @@ Changes are grouped as follows:
6363
- For `class Transformation`, which used to have an async `run` method, this is now named `run_async` to unify the overall interface. The same applies to the `cancel` and `jobs` methods for the same class, and `update` and `wait` on `TransformationJob`.
6464
- **ClientConfig**:
6565
- `max_workers` has functionally been removed (just throws a warning). Concurrency is now controlled via `global_config.concurrency_settings`.
66-
See the [Settings documentation](https://cognite-sdk-python.readthedocs-hosted.com/en/v8/settings.html#concurrency-settings) for details.
66+
See the [Settings documentation](https://cognite-sdk-python.readthedocs-hosted.com/en/latest/settings.html#concurrency-settings) for details.
6767
- `timeout`: default has been increased from 30 sec to 60 sec
6868
- **global_config**:
6969
- New setting `follow_redirects` that controls whether or not to follow redirects. Defaults to `False`.
7070
- New setting `file_download_chunk_size` that allows you to override the chunk size for streaming file downloads. Defaults to `None` (auto).
7171
- New setting `file_upload_chunk_size` that allows you to override the chunk size for streaming file uploads.
72-
- New setting `event_loop`, allowing you to override the default event loop used by the SDK
7372
- `proxies` have been replaced by `proxy` and follow httpx directly. See: [Proxies - HTTPX](https://www.python-httpx.org/advanced/proxies/)
7473
- `max_retry_backoff`: default has been increased from 30 sec to 60 sec
7574
- `max_connection_pool_size`: default has been reduced from 50 to 20

cognite/client/_http_client.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,19 @@ def set_cookie(self, cookie: Cookie) -> None:
4040
pass
4141

4242

43-
@functools.cache
43+
# One httpx.AsyncClient per event loop to avoid sharing connections (and their
44+
# loop-bound asyncio primitives) across different loops. This matters when a sync
45+
# CogniteClient (background loop) and an AsyncCogniteClient (e.g. Jupyter's loop)
46+
# coexist in the same process:
47+
_global_async_httpx_clients: dict[asyncio.AbstractEventLoop, httpx.AsyncClient] = {}
48+
49+
4450
def get_global_async_httpx_client() -> httpx.AsyncClient:
51+
loop = asyncio.get_running_loop()
52+
try:
53+
return _global_async_httpx_clients[loop]
54+
except KeyError:
55+
pass
4556
async_transport = httpx.AsyncHTTPTransport(
4657
proxy=global_config.proxy,
4758
retries=0, # 'retries': The maximum number of retries when trying to establish a connection.
@@ -59,14 +70,15 @@ def get_global_async_httpx_client() -> httpx.AsyncClient:
5970
keepalive_expiry=5, # copy httpx default
6071
),
6172
)
62-
return httpx.AsyncClient(
73+
client = _global_async_httpx_clients[loop] = httpx.AsyncClient(
6374
transport=async_transport,
6475
follow_redirects=global_config.follow_redirects,
6576
cookies=NoCookiesPlease(),
6677
# Below should not be needed when we pass transport, but... :)
6778
proxy=global_config.proxy,
6879
verify=not global_config.disable_ssl,
6980
)
81+
return client
7082

7183

7284
class AsyncHTTPClientWithRetryConfig:
@@ -196,7 +208,11 @@ def __init__(
196208
) -> None:
197209
self.config = config
198210
self.refresh_auth_header = refresh_auth_header
199-
self.httpx_async_client = httpx_async_client or get_global_async_httpx_client()
211+
self._httpx_async_client = httpx_async_client
212+
213+
@property
214+
def httpx_async_client(self) -> httpx.AsyncClient:
215+
return self._httpx_async_client or get_global_async_httpx_client()
200216

201217
async def request(
202218
self,

cognite/client/config.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import asyncio
43
import getpass
54
import pprint
65
import re
@@ -37,7 +36,7 @@ class GlobalConfig:
3736
max_workers (int): DEPRECATED: Use 'concurrency_settings' instead. Maximum number of concurrent API calls. Defaults to 5.
3837
concurrency_settings (ConcurrencySettings): Settings controlling the maximum number of concurrent API requests
3938
for different API categories (general, raw, data_modeling etc.). These settings are frozen after the
40-
first API request is made. See https://cognite-sdk-python.readthedocs-hosted.com/en/v8/settings.html#concurrency-settings
39+
first API request is made. See https://cognite-sdk-python.readthedocs-hosted.com/en/latest/settings.html#concurrency-settings
4140
follow_redirects (bool): Whether or not to follow redirects. Defaults to False.
4241
file_download_chunk_size (int | None): Specify the file chunk size for streaming file downloads. When not specified
4342
(default is None), the actual chunk size is determined by the underlying transport, which in turn is based on the
@@ -47,7 +46,6 @@ class GlobalConfig:
4746
translates to 65536 (64KiB chunks).
4847
silence_feature_preview_warnings (bool): Whether or not to silence warnings triggered by using alpha or beta
4948
features. Defaults to False.
50-
event_loop (asyncio.AbstractEventLoop | None): Override the default event loop used by the SDK.
5149
"""
5250

5351
def __new__(cls) -> GlobalConfig:
@@ -77,7 +75,6 @@ def __init__(self) -> None:
7775
self.file_download_chunk_size: int | None = None
7876
self.file_upload_chunk_size: int | None = None
7977
self.silence_feature_preview_warnings: bool = False
80-
self.event_loop: asyncio.AbstractEventLoop | None = None
8178

8279
@property
8380
def max_workers(self) -> int:
@@ -88,7 +85,7 @@ def max_workers(self, value: int) -> None:
8885
warnings.warn(
8986
"'max_workers' is no longer in use in the SDK as of v8, and will be removed in the next major version. "
9087
"Use 'global_config.concurrency_settings' instead for fine-grained control. For more info: "
91-
"https://cognite-sdk-python.readthedocs-hosted.com/en/v8/settings.html#concurrency-settings",
88+
"https://cognite-sdk-python.readthedocs-hosted.com/en/latest/settings.html#concurrency-settings",
9289
FutureWarning,
9390
stacklevel=2,
9491
)

cognite/client/utils/_concurrency.py

Lines changed: 36 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from abc import ABC, abstractmethod
88
from collections import UserList
99
from collections.abc import Callable, Coroutine
10-
from functools import cache
1110
from typing import (
1211
Any,
1312
Literal,
@@ -19,7 +18,7 @@
1918
from typing_extensions import assert_never
2019

2120
from cognite.client._constants import _RUNNING_IN_BROWSER
22-
from cognite.client.exceptions import CogniteAPIError, CogniteDuplicatedError, CogniteImportError, CogniteNotFoundError
21+
from cognite.client.exceptions import CogniteAPIError, CogniteDuplicatedError, CogniteNotFoundError
2322
from cognite.client.utils._auxiliary import no_op
2423

2524

@@ -50,6 +49,7 @@ def __init__(
5049
self._read = read
5150
self._write = write
5251
self._delete = delete
52+
self._semaphore_cache: dict[tuple[str, str, asyncio.AbstractEventLoop], asyncio.BoundedSemaphore] = {}
5353

5454
@property
5555
def read(self) -> int:
@@ -79,7 +79,7 @@ def delete(self, value: int) -> None:
7979
self._delete = value
8080

8181
@abstractmethod
82-
def _semaphore_factory(self, operation: str, project: str) -> asyncio.BoundedSemaphore: ...
82+
def _semaphore_factory(self, operation: Any, project: str) -> asyncio.BoundedSemaphore: ...
8383

8484
@abstractmethod
8585
def __repr__(self) -> str: ...
@@ -97,24 +97,30 @@ class CRUDConcurrency(ConcurrencyConfig):
9797
delete (int): Maximum number of concurrent delete requests.
9898
"""
9999

100-
@cache
101100
def _semaphore_factory(
102101
self, operation: Literal["read", "write", "delete"], project: str
103102
) -> asyncio.BoundedSemaphore:
104-
# We include 'project' in the cache, since concurrency limits should apply per-project
103+
# We include 'project' in the cache key, since concurrency limits should apply per-project.
104+
# We include the event loop because semaphores are bound to the loop they're first used on,
105+
# so the sync client (background loop) and async client (e.g. Jupyter's loop) need separate instances.
106+
key = (operation, project, asyncio.get_running_loop())
107+
if key in self._semaphore_cache:
108+
return self._semaphore_cache[key]
105109
from cognite.client import global_config
106110

107111
global_config.concurrency_settings._freeze() # Disallow any further changes
108112

109113
match operation:
110114
case "read":
111-
return asyncio.BoundedSemaphore(self.read)
115+
sem = asyncio.BoundedSemaphore(self.read)
112116
case "write":
113-
return asyncio.BoundedSemaphore(self.write)
117+
sem = asyncio.BoundedSemaphore(self.write)
114118
case "delete":
115-
return asyncio.BoundedSemaphore(self.delete)
119+
sem = asyncio.BoundedSemaphore(self.delete)
116120
case _:
117121
assert_never(operation)
122+
self._semaphore_cache[key] = sem
123+
return sem
118124

119125
def __repr__(self) -> str:
120126
return f"Concurrency[{self.api_name}](read={self._read}, write={self._write}, delete={self._delete})"
@@ -178,29 +184,32 @@ def write_schema(self, value: int) -> None:
178184
self._check_frozen("write_schema")
179185
self._write_schema = value
180186

181-
@cache
182187
def _semaphore_factory(
183188
self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"], project: str
184189
) -> asyncio.BoundedSemaphore:
185-
# We include 'project' in the cache, since concurrency limits should apply per-project
190+
key = (operation, project, asyncio.get_running_loop())
191+
if key in self._semaphore_cache:
192+
return self._semaphore_cache[key]
186193
from cognite.client import global_config
187194

188195
global_config.concurrency_settings._freeze() # Disallow any further changes
189196
match operation:
190197
case "read":
191-
return asyncio.BoundedSemaphore(self.read)
198+
sem = asyncio.BoundedSemaphore(self.read)
192199
case "write":
193-
return asyncio.BoundedSemaphore(self.write)
200+
sem = asyncio.BoundedSemaphore(self.write)
194201
case "delete":
195-
return asyncio.BoundedSemaphore(self.delete)
202+
sem = asyncio.BoundedSemaphore(self.delete)
196203
case "search":
197-
return asyncio.BoundedSemaphore(self.search)
204+
sem = asyncio.BoundedSemaphore(self.search)
198205
case "read_schema":
199-
return asyncio.BoundedSemaphore(self.read_schema)
206+
sem = asyncio.BoundedSemaphore(self.read_schema)
200207
case "write_schema":
201-
return asyncio.BoundedSemaphore(self.write_schema)
208+
sem = asyncio.BoundedSemaphore(self.write_schema)
202209
case _:
203210
assert_never(operation)
211+
self._semaphore_cache[key] = sem
212+
return sem
204213

205214
def __repr__(self) -> str:
206215
return (
@@ -216,7 +225,7 @@ class ConcurrencySettings:
216225
The total concurrency budget, i.e. the maximum number of concurrent requests in flight,
217226
is the sum of all categories (e.g. general) and operation types (e.g. read or write).
218227
219-
See: https://cognite-sdk-python.readthedocs-hosted.com/en/v8/settings.html#concurrency-settings
228+
See: https://cognite-sdk-python.readthedocs-hosted.com/en/latest/settings.html#concurrency-settings
220229
221230
Note:
222231
The settings apply on a per-project level, thus if you have multiple clients
@@ -258,7 +267,7 @@ def _check_frozen(self, name: str, api_name: str) -> None:
258267
raise RuntimeError(
259268
f"Cannot modify '{api_name}.{name}' after concurrency settings have been used to create semaphores. "
260269
"Concurrency settings must be configured before sending any API requests. "
261-
"See: https://cognite-sdk-python.readthedocs-hosted.com/en/v8/settings.html#concurrency-settings"
270+
"See: https://cognite-sdk-python.readthedocs-hosted.com/en/latest/settings.html#concurrency-settings"
262271
)
263272

264273
def _freeze(self) -> None:
@@ -508,73 +517,20 @@ def _raise_specific_error(
508517

509518

510519
class EventLoopThreadExecutor(threading.Thread):
511-
def __init__(self, loop: asyncio.AbstractEventLoop | None = None, daemon: bool = True) -> None:
520+
def __init__(self, daemon: bool = True) -> None:
512521
super().__init__(name=type(self).__name__, daemon=daemon)
513-
self._inside_jupyter = self._detect_jupyter()
514-
self._event_loop = self._patch_loop_for_jupyter(loop) or asyncio.new_event_loop()
515-
516-
@staticmethod
517-
def _detect_jupyter() -> bool:
518-
try:
519-
from IPython import get_ipython # type: ignore [attr-defined]
520-
521-
return "IPKernelApp" in get_ipython().config
522-
except Exception:
523-
return False
524-
525-
def _patch_loop_for_jupyter(self, loop: asyncio.AbstractEventLoop | None) -> asyncio.AbstractEventLoop | None:
526-
"""
527-
From the 'nest_asyncio' package: By design asyncio does not allow its event loop to be nested. This presents a
528-
practical problem: When in an environment where the event loop is already running it's impossible to run
529-
tasks and wait for the result.
530-
"""
531-
if not self._inside_jupyter:
532-
return loop
533-
534-
if loop is None:
535-
try:
536-
import nest_asyncio # type: ignore [import-not-found]
537-
except ImportError:
538-
raise CogniteImportError(
539-
module="nest_asyncio",
540-
message="Inside Jupyter notebooks, the 'nest_asyncio' package is required if you want to use the "
541-
'"non-async" CogniteClient. This is because Jupyter already runs an event loop that we need '
542-
"to patch (`pip install nest_asyncio`). Alternatively, you can use the AsyncCogniteClient "
543-
"which does not require any extra packages, but requires the use of 'await', e.g.: "
544-
"`dps = await async_client.time_series.data.retrieve(...)`",
545-
) from None
546-
try:
547-
# Jupyter: reuse the already running loop but patch it:
548-
loop = asyncio.get_running_loop()
549-
nest_asyncio.apply(loop)
550-
return loop
551-
except RuntimeError:
552-
return None # this would be very unexpected
553-
else:
554-
warnings.warn(
555-
RuntimeWarning(
556-
"Overriding the event loop is not recommended inside Jupyter notebooks "
557-
"since Jupyter already runs an event loop. Proceeding with the provided loop anyway, "
558-
"beware of potential issues."
559-
)
560-
)
561-
return loop
522+
self._event_loop = asyncio.new_event_loop()
562523

563524
def run(self) -> None:
564-
if not self._inside_jupyter:
565-
asyncio.set_event_loop(self._event_loop)
566-
self._event_loop.run_forever()
525+
asyncio.set_event_loop(self._event_loop)
526+
self._event_loop.run_forever()
567527

568528
def stop(self) -> None:
569-
if not self._inside_jupyter:
570-
self._event_loop.call_soon_threadsafe(self._event_loop.stop)
571-
self.join()
529+
self._event_loop.call_soon_threadsafe(self._event_loop.stop)
530+
self.join()
572531

573-
def run_coro(self, coro: Coroutine[_T, Any, _T], timeout: float | None = None) -> _T:
574-
if self._inside_jupyter:
575-
return asyncio.get_event_loop().run_until_complete(coro)
576-
else:
577-
return asyncio.run_coroutine_threadsafe(coro, self._event_loop).result(timeout)
532+
def run_coro(self, coro: Coroutine[Any, Any, _T], timeout: float | None = None) -> _T:
533+
return asyncio.run_coroutine_threadsafe(coro, self._event_loop).result(timeout)
578534

579535

580536
class _PyodideEventLoopExecutor:
@@ -609,13 +565,11 @@ def _get_event_loop_executor() -> EventLoopThreadExecutor:
609565
return _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON
610566
except NameError:
611567
# First time we need to initialize:
612-
from cognite.client import global_config
613-
614568
ex_cls = EventLoopThreadExecutor
615569
if _RUNNING_IN_BROWSER:
616570
ex_cls = cast(type[EventLoopThreadExecutor], _PyodideEventLoopExecutor)
617571

618-
executor = _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON = ex_cls(global_config.event_loop)
572+
executor = _INTERNAL_EVENT_LOOP_THREAD_EXECUTOR_SINGLETON = ex_cls()
619573
executor.start()
620574
return executor
621575

docs/source/settings.rst

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,23 @@ Alternatively, you can toggle debug logging on or off dynamically by setting the
168168
169169
Note: Large outgoing or incoming payloads will be truncated to 1000 characters in the logs to avoid overwhelming the log output.
170170

171+
Custom event loop (e.g. uvloop)
172+
-------------------------------
173+
The SDK automatically creates and manages a background ``asyncio`` event loop for the synchronous ``CogniteClient``.
174+
By default this uses Python's built-in ``asyncio`` event loop, but you can substitute any compatible implementation
175+
(such as `uvloop <https://github.com/MagicStack/uvloop>`_) by installing a custom loop *policy* before the first API
176+
call is made. The background loop is created via ``asyncio.new_event_loop()``, which respects the active policy.
177+
178+
.. code:: python
179+
180+
import uvloop
181+
182+
uvloop.install() # equivalent to asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
183+
184+
from cognite.client import CogniteClient
185+
186+
client = CogniteClient(...) # background loop will now be a uvloop
187+
171188
HTTP Request logging
172189
--------------------
173190
Internally this library uses the ``httpx`` library to perform network calls to the Cognite API service endpoints. For authentication and

mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ ignore_missing_imports = true
2525

2626
[mypy-sympy.*]
2727
ignore_missing_imports = true
28+
29+
[mypy-testbook.*]
30+
ignore_missing_imports = true

0 commit comments

Comments
 (0)